Skip to content

Commit

Permalink
feat: optimize get transactions query (#3496)
Browse files Browse the repository at this point in the history
Description
---
- Optimized the get transactions query (`broadcast_all_completed_transactions`) for transactions that need to be broadcast/rebroadcast by sending a single diesel SQL query that only returns the result, instead of multiple queries that return all the transactions in the database with filtering and selection in the Rust code.
- Added a new unit test 'test_get_tranactions_to_be_rebroadcast'.

Motivation and Context
---
It is much more efficient to have a SQL query perform the filtering upfront.

How Has This Been Tested?
---
Unit tests, cucumber tests.
  • Loading branch information
hansieodendaal authored Oct 26, 2021
1 parent 996c047 commit e651a60
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 16 deletions.
29 changes: 15 additions & 14 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1625,12 +1625,14 @@ where
}

trace!(target: LOG_TARGET, "Restarting transaction broadcast protocols");
self.broadcast_all_completed_transactions(broadcast_join_handles)
self.broadcast_completed_and_broadcast_transactions(broadcast_join_handles)
.await
.map_err(|resp| {
error!(
target: LOG_TARGET,
"Error broadcasting all completed transactions: {:?}", resp
"Error broadcasting all valid and not cancelled Completed Transactions with status 'Completed' \
and 'Broadcast': {:?}",
resp
);
resp
})?;
Expand Down Expand Up @@ -1682,22 +1684,21 @@ where
Ok(())
}

/// Go through all completed transactions that have not yet been broadcast and broadcast all of them to the base
/// Broadcast all valid and not cancelled completed transactions with status 'Completed' and 'Broadcast' to the base
/// node.
async fn broadcast_all_completed_transactions(
async fn broadcast_completed_and_broadcast_transactions(
&mut self,
join_handles: &mut FuturesUnordered<JoinHandle<Result<u64, TransactionServiceProtocolError>>>,
) -> Result<(), TransactionServiceError> {
trace!(target: LOG_TARGET, "Attempting to Broadcast all Completed Transactions");
let completed_txs = self.db.get_completed_transactions().await?;
for (_, completed_tx) in completed_txs {
if completed_tx.valid &&
(completed_tx.status == TransactionStatus::Completed ||
completed_tx.status == TransactionStatus::Broadcast) &&
!completed_tx.is_coinbase()
{
self.broadcast_completed_transaction(completed_tx, join_handles).await?;
}
trace!(
target: LOG_TARGET,
"Attempting to Broadcast all valid and not cancelled Completed Transactions with status 'Completed' and \
'Broadcast'"
);
let txn_list = self.db.get_transactions_to_be_broadcast().await?;
for completed_txn in txn_list {
self.broadcast_completed_transaction(completed_txn, join_handles)
.await?;
}

Ok(())
Expand Down
7 changes: 7 additions & 0 deletions base_layer/wallet/src/transaction_service/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub trait TransactionBackend: Send + Sync + Clone {

fn fetch_unconfirmed_transactions(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError>;

fn get_transactions_to_be_broadcast(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError>;

/// Check if a record with the provided key exists in the backend.
fn contains(&self, key: &DbKey) -> Result<bool, TransactionStorageError>;
/// Modify the state the of the backend with a write operation
Expand Down Expand Up @@ -424,6 +426,11 @@ where T: TransactionBackend + 'static
self.db.fetch_unconfirmed_transactions()
}

/// This method returns all completed transactions that must be broadcast
pub async fn get_transactions_to_be_broadcast(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
self.db.get_transactions_to_be_broadcast()
}

pub async fn get_completed_transaction_cancelled_or_not(
&self,
tx_id: TxId,
Expand Down
106 changes: 106 additions & 0 deletions base_layer/wallet/src/transaction_service/storage/sqlite_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,42 @@ impl TransactionBackend for TransactionServiceSqliteDatabase {
Ok(result)
}

fn get_transactions_to_be_broadcast(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
let start = Instant::now();
let conn = self.database_connection.acquire_lock();
let acquire_lock = start.elapsed();
let txs = completed_transactions::table
.filter(completed_transactions::valid.eq(true as i32))
.filter(
completed_transactions::status
.eq(TransactionStatus::Completed as i32)
.or(completed_transactions::status.eq(TransactionStatus::Broadcast as i32)),
)
.filter(
completed_transactions::coinbase_block_height
.is_null()
.or(completed_transactions::coinbase_block_height.eq(0)),
)
.filter(completed_transactions::cancelled.eq(false as i32))
.order_by(completed_transactions::tx_id)
.load::<CompletedTransactionSql>(&*conn)?;

let mut result = vec![];
for mut tx in txs {
self.decrypt_if_necessary(&mut tx)?;
result.push(tx.try_into()?);
}
trace!(
target: LOG_TARGET,
"sqlite profile - get_transactions_to_be_broadcast: lock {} + db_op {} = {} ms",
acquire_lock.as_millis(),
(start.elapsed() - acquire_lock).as_millis(),
start.elapsed().as_millis()
);

Ok(result)
}

fn mark_all_transactions_as_unvalidated(&self) -> Result<(), TransactionStorageError> {
let start = Instant::now();
let conn = self.database_connection.acquire_lock();
Expand Down Expand Up @@ -2362,4 +2398,74 @@ mod test {
assert!(db3.fetch(&DbKey::PendingOutboundTransactions).is_ok());
assert!(db3.fetch(&DbKey::CompletedTransactions).is_ok());
}

#[test]
fn test_get_tranactions_to_be_rebroadcast() {
let db_name = format!("{}.sqlite3", string(8).as_str());
let temp_dir = tempdir().unwrap();
let db_folder = temp_dir.path().to_str().unwrap().to_string();
let db_path = format!("{}{}", db_folder, db_name);

embed_migrations!("./migrations");
let conn = SqliteConnection::establish(&db_path).unwrap_or_else(|_| panic!("Error connecting to {}", db_path));

embedded_migrations::run_with_output(&conn, &mut std::io::stdout()).expect("Migration failed");

for i in 0..1000 {
let (valid, cancelled, status, coinbase_block_height) = match i % 13 {
0 => (true, i % 3 == 0, TransactionStatus::Completed, None),
1 => (true, i % 5 == 0, TransactionStatus::Broadcast, None),
2 => (true, i % 7 == 0, TransactionStatus::Completed, Some(i % 2)),
3 => (true, i % 11 == 0, TransactionStatus::Broadcast, Some(i % 2)),
4 => (i % 13 == 0, false, TransactionStatus::Completed, None),
5 => (i % 17 == 0, false, TransactionStatus::Broadcast, None),
6 => (true, false, TransactionStatus::Pending, None),
7 => (true, false, TransactionStatus::Coinbase, None),
8 => (true, false, TransactionStatus::MinedUnconfirmed, None),
9 => (true, false, TransactionStatus::Imported, None),
10 => (true, false, TransactionStatus::MinedConfirmed, None),
_ => (true, false, TransactionStatus::Completed, Some(i)),
};
let completed_tx = CompletedTransaction {
tx_id: i,
source_public_key: PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)),
destination_public_key: PublicKey::from_secret_key(&PrivateKey::random(&mut OsRng)),
amount: MicroTari::from(100),
fee: MicroTari::from(100),
transaction: Transaction::new(
vec![],
vec![],
vec![],
PrivateKey::random(&mut OsRng),
PrivateKey::random(&mut OsRng),
),
status,
message: "Yo!".to_string(),
timestamp: Utc::now().naive_utc(),
cancelled,
direction: TransactionDirection::Unknown,
coinbase_block_height,
send_count: 0,
last_send_timestamp: None,
valid,
confirmations: None,
mined_height: None,
mined_in_block: None,
};
let completed_tx_sql = CompletedTransactionSql::try_from(completed_tx).unwrap();
completed_tx_sql.commit(&conn).unwrap();
}

let connection = WalletDbConnection::new(conn, None);
let db1 = TransactionServiceSqliteDatabase::new(connection, None);

let txn_list = db1.get_transactions_to_be_broadcast().unwrap();
assert_eq!(db1.get_transactions_to_be_broadcast().unwrap().len(), 185);
for txn in &txn_list {
assert!(txn.status == TransactionStatus::Completed || txn.status == TransactionStatus::Broadcast);
assert!(txn.valid);
assert!(!txn.cancelled);
assert!(txn.coinbase_block_height == None || txn.coinbase_block_height == Some(0));
}
}
}
17 changes: 17 additions & 0 deletions base_layer/wallet/tests/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,23 @@ async fn test_txo_validation() {
.await
.unwrap();

// This is needed on a fast computer, otherwise the balance have not been updated correctly yet with the next step
let mut event_stream = oms.get_event_stream();
let delay = sleep(Duration::from_secs(10));
tokio::pin!(delay);
loop {
tokio::select! {
event = event_stream.recv() => {
if let OutputManagerEvent::TxoValidationSuccess(_) = &*event.unwrap(){
break;
}
},
() = &mut delay => {
break;
},
}
}

let balance = oms.get_balance().await.unwrap();
assert_eq!(
balance.available_balance,
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/features/WalletCli.feature
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Feature: Wallet CLI
# TODO: base node connection.
And I wait 30 seconds
And I stop wallet SENDER
And I make it rain from wallet SENDER 1 tx / sec 10 sec 8000 uT 100 increment to RECEIVER via command line
And I make it rain from wallet SENDER 1 tx per sec 10 sec 8000 uT 100 increment to RECEIVER via command line
Then wallet SENDER has at least 10 transactions that are all TRANSACTION_STATUS_BROADCAST and valid
Then wallet RECEIVER has at least 10 transactions that are all TRANSACTION_STATUS_BROADCAST and valid
And mining node MINE mines 5 blocks
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/features/support/steps.js
Original file line number Diff line number Diff line change
Expand Up @@ -3547,7 +3547,7 @@ When(
);

Then(
"I make it rain from wallet {word} {int} tx / sec {int} sec {int} uT {int} increment to {word} via command line",
"I make it rain from wallet {word} {int} tx per sec {int} sec {int} uT {int} increment to {word} via command line",
{ timeout: 300 * 1000 },
async function (sender, freq, duration, amount, amount_inc, receiver) {
let wallet = this.getWallet(sender);
Expand Down

0 comments on commit e651a60

Please sign in to comment.