Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add wallet ffi shutdown tests #6007

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion base_layer/contacts/src/contacts_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ where T: ContactsBackend + 'static
let shutdown = self
.shutdown_signal
.take()
.expect("Output Manager Service initialized without shutdown signal");
.expect("Contacts Service initialized without shutdown signal");
pin_mut!(shutdown);

// Add all contacts as monitored peers to the liveness service
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet_ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ tari_key_manager = { path = "../key_manager" }
tari_common_types = { path = "../../base_layer/common_types"}
tari_test_utils = { path = "../../infrastructure/test_utils"}
tari_service_framework = { path = "../../base_layer/service_framework" }
tari_core = { path = "../../base_layer/core", default-features = false, features = ["base_node"] }
borsh = "1.2"

[build-dependencies]
Expand Down
308 changes: 308 additions & 0 deletions base_layer/wallet_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8249,8 +8249,22 @@ pub unsafe extern "C" fn emoji_set_destroy(emoji_set: *mut EmojiSet) {
pub unsafe extern "C" fn wallet_destroy(wallet: *mut TariWallet) {
if !wallet.is_null() {
let mut w = Box::from_raw(wallet);
let wallet_comms = w.wallet.comms.clone();
w.shutdown.trigger();
w.runtime.block_on(w.wallet.wait_until_shutdown());
// The wallet should be shutdown by now; these are just additional confirmations
loop {
if w.shutdown.is_triggered() &&
wallet_comms.shutdown_signal().is_triggered() &&
w.runtime
.block_on(wallet_comms.connectivity().get_connectivity_status())
.is_err()
{
break;
};
w.runtime
.block_on(async { tokio::time::sleep(Duration::from_millis(250)).await });
}
}
}

Expand Down Expand Up @@ -8575,6 +8589,7 @@ mod test {
use once_cell::sync::Lazy;
use tari_common_types::{emoji, transaction::TransactionStatus, types::PrivateKey};
use tari_comms::peer_manager::PeerFeatures;
use tari_contacts::contacts_service::types::{Direction, Message, MessageMetadata};
use tari_core::{
covenant,
transactions::{
Expand All @@ -8583,6 +8598,7 @@ mod test {
},
};
use tari_key_manager::{mnemonic::MnemonicLanguage, mnemonic_wordlists};
use tari_p2p::initialization::MESSAGING_PROTOCOL_ID;
use tari_script::script;
use tari_test_utils::random;
use tempfile::tempdir;
Expand Down Expand Up @@ -11046,4 +11062,296 @@ mod test {
let _spending_key = Box::from_raw(spending_key_ptr);
}
}

#[test]
#[allow(clippy::too_many_lines)]
pub fn test_wallet_shutdown() {
unsafe {
let mut error = 0;
let error_ptr = &mut error as *mut c_int;
let mut recovery_in_progress = false;
let recovery_in_progress_ptr = &mut recovery_in_progress as *mut bool;

// Create a new wallet for Alice
let db_name = CString::new(random::string(8).as_str()).unwrap();
let alice_db_name_str: *const c_char = CString::into_raw(db_name) as *const c_char;
let temp_dir = tempdir().unwrap();
let db_path = CString::new(temp_dir.path().to_str().unwrap()).unwrap();
let alice_db_path_str: *const c_char = CString::into_raw(db_path) as *const c_char;
let alice_transport_type = transport_memory_create();
let address = transport_memory_get_address(alice_transport_type, error_ptr);
let address_str = CStr::from_ptr(address).to_str().unwrap().to_owned();
let alice_address_str = CString::new(address_str).unwrap().into_raw() as *const c_char;
let network = CString::new(NETWORK_STRING).unwrap();
let alice_network_str: *const c_char = CString::into_raw(network) as *const c_char;

let alice_config = comms_config_create(
alice_address_str,
alice_transport_type,
alice_db_name_str,
alice_db_path_str,
20,
10800,
error_ptr,
);
let passphrase: *const c_char = CString::into_raw(CString::new("niao").unwrap()) as *const c_char;
let alice_wallet_ptr = wallet_create(
alice_config,
ptr::null(),
0,
0,
passphrase,
ptr::null(),
alice_network_str,
received_tx_callback,
received_tx_reply_callback,
received_tx_finalized_callback,
broadcast_callback,
mined_callback,
mined_unconfirmed_callback,
scanned_callback,
scanned_unconfirmed_callback,
transaction_send_result_callback,
tx_cancellation_callback,
txo_validation_complete_callback,
contacts_liveness_data_updated_callback,
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
base_node_state_callback,
recovery_in_progress_ptr,
error_ptr,
);
assert_eq!(error, 0);
string_destroy(alice_network_str as *mut c_char);
string_destroy(alice_db_name_str as *mut c_char);
string_destroy(alice_db_path_str as *mut c_char);
string_destroy(alice_address_str as *mut c_char);
transport_config_destroy(alice_transport_type);
comms_config_destroy(alice_config);

// Create a new wallet for bob
let db_name = CString::new(random::string(8).as_str()).unwrap();
let bob_db_name_str: *const c_char = CString::into_raw(db_name) as *const c_char;
let temp_dir = tempdir().unwrap();
let db_path = CString::new(temp_dir.path().to_str().unwrap()).unwrap();
let bob_db_path_str: *const c_char = CString::into_raw(db_path) as *const c_char;
let bob_transport_type = transport_memory_create();
let address = transport_memory_get_address(bob_transport_type, error_ptr);
let address_str = CStr::from_ptr(address).to_str().unwrap().to_owned();
let bob_address_str = CString::new(address_str).unwrap().into_raw() as *const c_char;
let network = CString::new(NETWORK_STRING).unwrap();
let bob_network_str: *const c_char = CString::into_raw(network) as *const c_char;

let bob_config = comms_config_create(
bob_address_str,
bob_transport_type,
bob_db_name_str,
bob_db_path_str,
20,
10800,
error_ptr,
);
let passphrase: *const c_char = CString::into_raw(CString::new("niao").unwrap()) as *const c_char;
let bob_wallet_ptr = wallet_create(
bob_config,
ptr::null(),
0,
0,
passphrase,
ptr::null(),
bob_network_str,
received_tx_callback,
received_tx_reply_callback,
received_tx_finalized_callback,
broadcast_callback,
mined_callback,
mined_unconfirmed_callback,
scanned_callback,
scanned_unconfirmed_callback,
transaction_send_result_callback,
tx_cancellation_callback,
txo_validation_complete_callback,
contacts_liveness_data_updated_callback,
balance_updated_callback,
transaction_validation_complete_callback,
saf_messages_received_callback,
connectivity_status_callback,
base_node_state_callback,
recovery_in_progress_ptr,
error_ptr,
);
assert_eq!(error, 0);
string_destroy(bob_network_str as *mut c_char);
string_destroy(bob_db_name_str as *mut c_char);
string_destroy(bob_db_path_str as *mut c_char);
string_destroy(bob_address_str as *mut c_char);
transport_config_destroy(bob_transport_type);
comms_config_destroy(bob_config);

// Add some peers
// - Wallet peer for Alice (add Bob as a base node peer; not how it will be done in production but good
// enough for the test as we just need to make sure the wallet can connect to a peer)
let bob_wallet_comms = (*bob_wallet_ptr).wallet.comms.clone();
let bob_node_identity = bob_wallet_comms.node_identity();
let bob_peer_public_key_ptr = Box::into_raw(Box::new(bob_node_identity.public_key().clone()));
let bob_peer_address_ptr =
CString::into_raw(CString::new(bob_node_identity.first_public_address().unwrap().to_string()).unwrap())
as *const c_char;
wallet_add_base_node_peer(
alice_wallet_ptr,
bob_peer_public_key_ptr,
bob_peer_address_ptr,
error_ptr,
);
string_destroy(bob_peer_address_ptr as *mut c_char);
let _destroyed = Box::from_raw(bob_peer_public_key_ptr);
// - Wallet peer for Bob (add Alice as a base node peer; same as above)
let alice_wallet_comms = (*alice_wallet_ptr).wallet.comms.clone();
let alice_node_identity = alice_wallet_comms.node_identity();
let alice_peer_public_key_ptr = Box::into_raw(Box::new(alice_node_identity.public_key().clone()));
let alice_peer_address_ptr = CString::into_raw(
CString::new(alice_node_identity.first_public_address().unwrap().to_string()).unwrap(),
) as *const c_char;
wallet_add_base_node_peer(
bob_wallet_ptr,
alice_peer_public_key_ptr,
alice_peer_address_ptr,
error_ptr,
);
string_destroy(alice_peer_address_ptr as *mut c_char);
let _destroyed = Box::from_raw(alice_peer_public_key_ptr);

// Add some contacts
// - Contact for Alice
let bob_wallet_address = TariWalletAddress::new(bob_node_identity.public_key().clone(), Network::LocalNet);
let alice_contact_alias_ptr: *const c_char =
CString::into_raw(CString::new("bob").unwrap()) as *const c_char;
let alice_contact_address_ptr = Box::into_raw(Box::new(bob_wallet_address.clone()));
let alice_contact_ptr = contact_create(alice_contact_alias_ptr, alice_contact_address_ptr, true, error_ptr);
tari_address_destroy(alice_contact_address_ptr);
assert!(wallet_upsert_contact(alice_wallet_ptr, alice_contact_ptr, error_ptr));
contact_destroy(alice_contact_ptr);
// - Contact for Bob
let alice_wallet_address =
TariWalletAddress::new(alice_node_identity.public_key().clone(), Network::LocalNet);
let bob_contact_alias_ptr: *const c_char =
CString::into_raw(CString::new("alice").unwrap()) as *const c_char;
let bob_contact_address_ptr = Box::into_raw(Box::new(alice_wallet_address.clone()));
let bob_contact_ptr = contact_create(bob_contact_alias_ptr, bob_contact_address_ptr, true, error_ptr);
tari_address_destroy(bob_contact_address_ptr);
assert!(wallet_upsert_contact(bob_wallet_ptr, bob_contact_ptr, error_ptr));
contact_destroy(bob_contact_ptr);

// Use comms service - do `dial_peer` for both wallets (we do not 'assert!' here to not make the test flaky)
// Note: This loop is just to make sure we actually connect as the first attempts do not always succeed
let alice_wallet_runtime = &(*alice_wallet_ptr).runtime;
let bob_wallet_runtime = &(*bob_wallet_ptr).runtime;
let mut alice_dialed_bob = false;
let mut bob_dialed_alice = false;
let mut dial_count = 0;
loop {
dial_count += 1;
if !alice_dialed_bob {
alice_dialed_bob = alice_wallet_runtime
.block_on(
alice_wallet_comms
.connectivity()
.dial_peer(bob_node_identity.node_id().clone()),
)
.is_ok();
}
if !bob_dialed_alice {
bob_dialed_alice = bob_wallet_runtime
.block_on(
bob_wallet_comms
.connectivity()
.dial_peer(alice_node_identity.node_id().clone()),
)
.is_ok();
}
if alice_dialed_bob && bob_dialed_alice || dial_count > 10 {
break;
}
// Wait a bit before the next attempt
alice_wallet_runtime.block_on(async { tokio::time::sleep(Duration::from_millis(500)).await });
}

// Use contacts service - send some messages for both wallets
let mut alice_wallet_contacts_service = (*alice_wallet_ptr).wallet.contacts_service.clone();
let mut bob_wallet_contacts_service = (*bob_wallet_ptr).wallet.contacts_service.clone();
let mut alice_msg_count = 0;
let mut bob_msg_count = 0;
// Note: This loop is just to make sure we actually send a couple of messages as the first attempts do not
// always succeed (we do not 'assert!' here to not make the test flaky)
for i in 0..60 {
if alice_msg_count < 5 {
let alice_message_result =
alice_wallet_runtime.block_on(alice_wallet_contacts_service.send_message(Message {
body: vec![i],
metadata: vec![MessageMetadata::default()],
address: bob_wallet_address.clone(),
direction: Direction::Outbound,
stored_at: u64::from(i),
delivery_confirmation_at: None,
read_confirmation_at: None,
message_id: vec![i],
}));
if alice_message_result.is_ok() {
alice_msg_count += 1;
}
}
if bob_msg_count < 5 {
let bob_message_result =
bob_wallet_runtime.block_on(bob_wallet_contacts_service.send_message(Message {
body: vec![i],
metadata: vec![MessageMetadata::default()],
address: alice_wallet_address.clone(),
direction: Direction::Outbound,
stored_at: u64::from(i),
delivery_confirmation_at: None,
read_confirmation_at: None,
message_id: vec![i],
}));
if bob_message_result.is_ok() {
bob_msg_count += 1;
}
}
if alice_msg_count >= 5 && bob_msg_count >= 5 {
break;
}
// Wait a bit before the next attempt
alice_wallet_runtime.block_on(async { tokio::time::sleep(Duration::from_millis(500)).await });
}

// Trigger Alice wallet shutdown (same as `pub unsafe extern "C" fn wallet_destroy(wallet: *mut TariWallet)`
wallet_destroy(alice_wallet_ptr);

// Bob's peer connection to Alice will still be active for a short while until Bob figures out Alice is
// gone, and a 'dial_peer' command to Alice from Bob may return the previous connection state, but it
// should not be possible to do anything with the connection.
let bob_comms_dial_peer = bob_wallet_runtime.block_on(
bob_wallet_comms
.connectivity()
.dial_peer(alice_node_identity.node_id().clone()),
);
if let Ok(mut connection_to_alice) = bob_comms_dial_peer {
if bob_wallet_runtime
.block_on(connection_to_alice.open_substream(&MESSAGING_PROTOCOL_ID.clone()))
.is_ok()
{
panic!("Connection to Alice should not be active!");
}
}

// - Bob can still retrieve messages Alice sent
let bob_contacts_get_messages =
bob_wallet_runtime.block_on(bob_wallet_contacts_service.get_messages(alice_wallet_address, 1, 1));
assert!(bob_contacts_get_messages.is_ok());

// Cleanup
wallet_destroy(bob_wallet_ptr);
}
}
}