Skip to content

Commit

Permalink
feat: implement multiple read single write for sqlite (#3568)
Browse files Browse the repository at this point in the history
Description
---
- Implemented a generic system-wide pool connection manager for diesel SQLite connections that uses the modern Write-ahead Log (WAL) SQLite3 database mode to speed up database operations  (_see https://www.sqlite.org/wal.html_) with a configurable pool size.
- Changed the Wallet's SQLite database connection to make use of the pool connection manager.
- Refined SQLite profiling trace logs to only log an entry if the total time is > 1ms - this cut down on noisy entries as most database calls are now < 1ms.
- Notable SQLite pragma settings:
  - [PRAGMA schema.journal_mode](https://www.sqlite.org/pragma.html#pragma_journal_mode) = `WAL`
  - [PRAGMA schema.locking_mode](https://www.sqlite.org/pragma.html#pragma_locking_mode) = `NORMAL `
  - [PRAGMA schema.synchronous](https://www.sqlite.org/pragma.html#pragma_synchronous) = `| 2 | FULL`
  - [PRAGMA wal_autocheckpoint](https://www.sqlite.org/pragma.html#pragma_wal_autocheckpoint) = `1000` (the default)

Motivation and Context
---
The wallet's database connection was not optimal.

How Has This Been Tested?
---
- Unit tests
- Cucumber tests (`npm test -- --tags "not @long-running and not @broken"`)
- System-level tests with positive results all around (see discussion):
  - Configuration: Base node + console wallet + merge mining proxy + XMRig -> Mining
  - Test 1: Moderate coin split (22 transactions producing 3124 outputs)
  - Test 2: Moderate stress test (2x simultaneous `make-it-rain` transactions from one sender to two receiving wallets of 250 transactions each)

The graphs below show the SQLite WAL mode profiling for the above two tests (sender wallet only) with a pool size of 16 for all read and write database operations where the total time (acquire lock time + db operation time) was larger than 1ms. The wallet in question had a communications breakdown to its connected base node for a while after all transactions were completed and when it eventually gained an RPC connection validation protocols for all 500 transactions queried the base node as fast as possible, which panned out in some of those protocols waiting for a pooled connection. The actual time db operations will wait for a write operation to conclude is managed by SQLite and bunched within the `db op time` measurement.

![image](https://user-images.githubusercontent.com/39146854/141734409-34b268d1-0b6c-4cc9-91d1-6933b244ee3d.png)

![image](https://user-images.githubusercontent.com/39146854/141733462-583eb0eb-e7ed-4c8c-bfb3-48e3acd4e800.png)
  • Loading branch information
hansieodendaal authored Nov 15, 2021
1 parent 11b8afa commit 8d22164
Show file tree
Hide file tree
Showing 29 changed files with 1,471 additions and 944 deletions.
31 changes: 31 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ members = [
"comms",
"comms/dht",
"comms/rpc_macros",
"common_sqlite",
"infrastructure/shutdown",
"infrastructure/storage",
"infrastructure/test_utils",
Expand Down
5 changes: 3 additions & 2 deletions applications/tari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ pub async fn init_wallet(
// test encryption by initializing with no passphrase...
let db_path = config.console_wallet_db_file.clone();

let result = initialize_sqlite_database_backends(db_path.clone(), None);
let result = initialize_sqlite_database_backends(db_path.clone(), None, config.wallet_connection_manager_pool_size);
let (backends, wallet_encrypted) = match result {
Ok(backends) => {
// wallet is not encrypted
Expand All @@ -281,7 +281,8 @@ pub async fn init_wallet(
Err(WalletStorageError::NoPasswordError) => {
// get supplied or prompt password
let passphrase = get_or_prompt_password(arg_password.clone(), config.console_wallet_password.clone())?;
let backends = initialize_sqlite_database_backends(db_path, passphrase)?;
let backends =
initialize_sqlite_database_backends(db_path, passphrase, config.wallet_connection_manager_pool_size)?;

(backends, true)
},
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ tari_p2p = { version = "^0.21", path = "../p2p", features = ["auto-update"] }
tari_service_framework = { version = "^0.21", path = "../service_framework" }
tari_shutdown = { version = "^0.21", path = "../../infrastructure/shutdown" }
tari_storage = { version = "^0.21", path = "../../infrastructure/storage" }
tari_common_sqlite = { path = "../../common_sqlite" }

aes-gcm = "^0.8"
async-trait = "0.1.50"
Expand Down
6 changes: 3 additions & 3 deletions base_layer/wallet/src/contacts_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::contacts_service::storage::database::DbKey;
use crate::{contacts_service::storage::database::DbKey, error::WalletStorageError};
use diesel::result::Error as DieselError;
use tari_service_framework::reply_channel::TransportChannelError;
use thiserror::Error;
Expand Down Expand Up @@ -50,8 +50,8 @@ pub enum ContactsServiceStorageError {
ValueNotFound(DbKey),
#[error("Unexpected result error: `{0}`")]
UnexpectedResult(String),
#[error("R2d2 error")]
R2d2Error,
#[error("Diesel R2d2 error: `{0}`")]
DieselR2d2Error(#[from] WalletStorageError),
#[error("Diesel error: `{0}`")]
DieselError(#[from] DieselError),
#[error("Diesel connection error: `{0}`")]
Expand Down
40 changes: 23 additions & 17 deletions base_layer/wallet/src/contacts_service/storage/sqlite_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,22 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::convert::TryFrom;

use diesel::{prelude::*, result::Error as DieselError, SqliteConnection};
use tari_crypto::tari_utilities::ByteArray;

use tari_common_types::types::PublicKey;

use crate::{
contacts_service::{
error::ContactsServiceStorageError,
storage::database::{Contact, ContactsBackend, DbKey, DbKeyValuePair, DbValue, WriteOperation},
},
schema::contacts,
storage::sqlite_utilities::WalletDbConnection,
storage::sqlite_utilities::wallet_db_connection::WalletDbConnection,
util::diesel_ext::ExpectedRowsExtension,
};
use diesel::{prelude::*, result::Error as DieselError, SqliteConnection};
use std::convert::TryFrom;
use tari_common_types::types::PublicKey;
use tari_crypto::tari_utilities::ByteArray;

/// A Sqlite backend for the Output Manager Service. The Backend is accessed via a connection pool to the Sqlite file.
#[derive(Clone)]
Expand All @@ -47,10 +50,10 @@ impl ContactsServiceSqliteDatabase {

impl ContactsBackend for ContactsServiceSqliteDatabase {
fn fetch(&self, key: &DbKey) -> Result<Option<DbValue>, ContactsServiceStorageError> {
let conn = self.database_connection.acquire_lock();
let conn = self.database_connection.get_pooled_connection()?;

let result = match key {
DbKey::Contact(pk) => match ContactSql::find(&pk.to_vec(), &(*conn)) {
DbKey::Contact(pk) => match ContactSql::find(&pk.to_vec(), &conn) {
Ok(c) => Some(DbValue::Contact(Box::new(Contact::try_from(c)?))),
Err(ContactsServiceStorageError::DieselError(DieselError::NotFound)) => None,
Err(e) => return Err(e),
Expand All @@ -67,21 +70,21 @@ impl ContactsBackend for ContactsServiceSqliteDatabase {
}

fn write(&self, op: WriteOperation) -> Result<Option<DbValue>, ContactsServiceStorageError> {
let conn = self.database_connection.acquire_lock();
let conn = self.database_connection.get_pooled_connection()?;

match op {
WriteOperation::Upsert(kvp) => match kvp {
DbKeyValuePair::Contact(k, c) => match ContactSql::find(&k.to_vec(), &(*conn)) {
DbKeyValuePair::Contact(k, c) => match ContactSql::find(&k.to_vec(), &conn) {
Ok(found_c) => {
let _ = found_c.update(UpdateContact { alias: Some(c.alias) }, &(*conn))?;
let _ = found_c.update(UpdateContact { alias: Some(c.alias) }, &conn)?;
},
Err(_) => {
ContactSql::from(c).commit(&conn)?;
},
},
},
WriteOperation::Remove(k) => match k {
DbKey::Contact(k) => match ContactSql::find(&k.to_vec(), &(*conn)) {
DbKey::Contact(k) => match ContactSql::find(&k.to_vec(), &conn) {
Ok(c) => {
c.delete(&conn)?;
return Ok(Some(DbValue::Contact(Box::new(Contact::try_from(c)?))));
Expand Down Expand Up @@ -181,20 +184,23 @@ pub struct UpdateContact {

#[cfg(test)]
mod test {
use crate::contacts_service::storage::{
database::Contact,
sqlite_db::{ContactSql, UpdateContact},
};
use std::convert::TryFrom;

use diesel::{Connection, SqliteConnection};
use rand::rngs::OsRng;
use std::convert::TryFrom;
use tari_common_types::types::{PrivateKey, PublicKey};
use tari_crypto::{
keys::{PublicKey as PublicKeyTrait, SecretKey as SecretKeyTrait},
tari_utilities::ByteArray,
};

use tari_common_types::types::{PrivateKey, PublicKey};
use tari_test_utils::{paths::with_temp_dir, random::string};

use crate::contacts_service::storage::{
database::Contact,
sqlite_db::{ContactSql, UpdateContact},
};

#[test]
fn test_crud() {
with_temp_dir(|dir_path| {
Expand Down
11 changes: 9 additions & 2 deletions base_layer/wallet/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use diesel::result::Error as DieselError;
use log::SetLoggerError;
use serde_json::Error as SerdeJsonError;
use tari_common::exit_codes::ExitCodes;
use tari_common_sqlite::error::SqliteStorageError;
use tari_comms::{
connectivity::ConnectivityError,
multiaddr,
Expand Down Expand Up @@ -113,8 +114,8 @@ pub enum WalletStorageError {
DbPathDoesNotExist,
#[error("Serde json error: `{0}`")]
SerdeJsonError(#[from] SerdeJsonError),
#[error("R2d2 error")]
R2d2Error,
#[error("Diesel R2d2 error: `{0}`")]
DieselR2d2Error(#[from] SqliteStorageError),
#[error("Diesel error: `{0}`")]
DieselError(#[from] DieselError),
#[error("Diesel connection error: `{0}`")]
Expand Down Expand Up @@ -169,3 +170,9 @@ impl From<WalletStorageError> for ExitCodes {
}
}
}

impl PartialEq for WalletStorageError {
fn eq(&self, other: &Self) -> bool {
self == other
}
}
6 changes: 3 additions & 3 deletions base_layer/wallet/src/output_manager_service/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::base_node_service::error::BaseNodeServiceError;
use crate::{base_node_service::error::BaseNodeServiceError, error::WalletStorageError};
use diesel::result::Error as DieselError;
use tari_common::exit_codes::ExitCodes;
use tari_comms::{connectivity::ConnectivityError, peer_manager::node_id::NodeIdError, protocol::rpc::RpcError};
Expand Down Expand Up @@ -142,8 +142,8 @@ pub enum OutputManagerStorageError {
OutputAlreadySpent,
#[error("Key Manager not initialized")]
KeyManagerNotInitialized,
#[error("R2d2 error")]
R2d2Error,
#[error("Diesel R2d2 error: `{0}`")]
DieselR2d2Error(#[from] WalletStorageError),
#[error("Transaction error: `{0}`")]
TransactionError(#[from] TransactionError),
#[error("Diesel error: `{0}`")]
Expand Down
Loading

0 comments on commit 8d22164

Please sign in to comment.