Skip to content

Commit

Permalink
test: Simplify some tests using the assert_next_with_timeout macro
Browse files Browse the repository at this point in the history
  • Loading branch information
poljar committed Jan 31, 2025
1 parent 3707d2f commit 8d27b0c
Showing 1 changed file with 69 additions and 78 deletions.
147 changes: 69 additions & 78 deletions crates/matrix-sdk/src/room/identity_status_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,17 +194,14 @@ fn wrap_room_member_events(

#[cfg(test)]
mod tests {
use std::{
pin::{pin, Pin},
time::Duration,
};

use futures_core::Stream;
use futures_util::FutureExt;
use matrix_sdk_base::crypto::{IdentityState, IdentityStatusChange};
use std::time::Duration;

use futures_util::{pin_mut, FutureExt as _, StreamExt as _};
use matrix_sdk_base::crypto::IdentityState;
use matrix_sdk_test::{async_test, test_json::keys_query_sets::IdentityChangeDataSet};
use test_setup::TestSetup;
use tokio_stream::{StreamExt, Timeout};

use crate::assert_next_with_timeout;

#[async_test]
async fn test_when_user_becomes_unpinned_we_report_it() {
Expand All @@ -215,13 +212,14 @@ mod tests {
t.pin_bob().await;

// And we are listening for identity changes
let changes = t.subscribe_to_identity_status_changes().await;
let stream = t.subscribe_to_identity_status_changes().await;
pin_mut!(stream);

// When Bob becomes unpinned
t.unpin_bob().await;

// Then we were notified about it
let change = next_change(&mut pin!(changes)).await;
let change = assert_next_with_timeout!(stream);
assert_eq!(change[0].user_id, t.bob_user_id());
assert_eq!(change[0].changed_to, IdentityState::PinViolation);
assert_eq!(change.len(), 1);
Expand All @@ -236,13 +234,14 @@ mod tests {
t.verify_bob().await;

// And we are listening for identity changes
let changes = t.subscribe_to_identity_status_changes().await;
let stream = t.subscribe_to_identity_status_changes().await;
pin_mut!(stream);

// When Bob's identity changes
t.unpin_bob().await;

// Then we were notified about a verification violation
let change = next_change(&mut pin!(changes)).await;
let change = assert_next_with_timeout!(stream);
assert_eq!(change[0].user_id, t.bob_user_id());
assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
assert_eq!(change.len(), 1);
Expand All @@ -257,20 +256,20 @@ mod tests {
t.unpin_bob().await;

// And we are listening for identity changes
let changes = t.subscribe_to_identity_status_changes().await;
let mut changes = pin!(changes);
let stream = t.subscribe_to_identity_status_changes().await;
pin_mut!(stream);

// When Bob becomes pinned
t.pin_bob().await;

// Then we were notified about the initial state of the room
let change1 = next_change(&mut changes).await;
let change1 = assert_next_with_timeout!(stream);
assert_eq!(change1[0].user_id, t.bob_user_id());
assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
assert_eq!(change1.len(), 1);

// And the change when Bob became pinned
let change2 = next_change(&mut changes).await;
let change2 = assert_next_with_timeout!(stream);
assert_eq!(change2[0].user_id, t.bob_user_id());
assert_eq!(change2[0].changed_to, IdentityState::Pinned);
assert_eq!(change2.len(), 1);
Expand All @@ -282,8 +281,8 @@ mod tests {
let t = TestSetup::new_room_with_other_bob().await;

// And we are listening for identity changes
let changes = t.subscribe_to_identity_status_changes().await;
let mut changes = pin!(changes);
let stream = t.subscribe_to_identity_status_changes().await;
pin_mut!(stream);

// When Bob becomes verified
t.verify_bob().await;
Expand All @@ -292,10 +291,10 @@ mod tests {
t.unpin_bob().await;

// Then we are only notified about the unpinning part
let change2 = next_change(&mut changes).await;
assert_eq!(change2[0].user_id, t.bob_user_id());
assert_eq!(change2[0].changed_to, IdentityState::VerificationViolation);
assert_eq!(change2.len(), 1);
let change = assert_next_with_timeout!(stream);
assert_eq!(change[0].user_id, t.bob_user_id());
assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
assert_eq!(change.len(), 1);
}

#[async_test]
Expand All @@ -307,20 +306,20 @@ mod tests {
t.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_a()).await;

// And we are listening for identity changes
let changes = t.subscribe_to_identity_status_changes().await;
let mut changes = pin!(changes);
let stream = t.subscribe_to_identity_status_changes().await;
pin_mut!(stream);

// When Bob becomes verified
t.verify_bob().await;

// Then we were notified about the initial state of the room
let change1 = next_change(&mut changes).await;
let change1 = assert_next_with_timeout!(stream);
assert_eq!(change1[0].user_id, t.bob_user_id());
assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
assert_eq!(change1.len(), 1);

// And the change when Bob became verified
let change2 = next_change(&mut changes).await;
let change2 = assert_next_with_timeout!(stream);
assert_eq!(change2[0].user_id, t.bob_user_id());
assert_eq!(change2[0].changed_to, IdentityState::Verified);
assert_eq!(change2.len(), 1);
Expand All @@ -341,20 +340,20 @@ mod tests {
t.unpin_bob().await;

// And we are listening for identity changes
let changes = t.subscribe_to_identity_status_changes().await;
let mut changes = pin!(changes);
let stream = t.subscribe_to_identity_status_changes().await;
pin_mut!(stream);

// When Bob becomes verified
t.verify_bob().await;

// Then we were notified about the initial state of the room
let change1 = next_change(&mut changes).await;
let change1 = assert_next_with_timeout!(stream);
assert_eq!(change1[0].user_id, t.bob_user_id());
assert_eq!(change1[0].changed_to, IdentityState::VerificationViolation);
assert_eq!(change1.len(), 1);

// And the change when Bob became verified
let change2 = next_change(&mut changes).await;
let change2 = assert_next_with_timeout!(stream);
assert_eq!(change2[0].user_id, t.bob_user_id());
assert_eq!(change2[0].changed_to, IdentityState::Verified);
assert_eq!(change2.len(), 1);
Expand All @@ -369,13 +368,14 @@ mod tests {
t.unpin_bob().await;

// And we are listening for identity changes
let changes = t.subscribe_to_identity_status_changes().await;
let stream = t.subscribe_to_identity_status_changes().await;
pin_mut!(stream);

// When Bob joins the room
t.bob_joins().await;

// Then we were notified about it
let change = next_change(&mut pin!(changes)).await;
let change = assert_next_with_timeout!(stream);
assert_eq!(change[0].user_id, t.bob_user_id());
assert_eq!(change[0].changed_to, IdentityState::PinViolation);
assert_eq!(change.len(), 1);
Expand All @@ -391,13 +391,14 @@ mod tests {
t.unpin_bob().await;

// And we are listening for identity changes
let changes = t.subscribe_to_identity_status_changes().await;
let stream = t.subscribe_to_identity_status_changes().await;
pin_mut!(stream);

// When Bob joins the room
t.bob_joins().await;

// Then we were notified about it
let change = next_change(&mut pin!(changes)).await;
let change = assert_next_with_timeout!(stream);
assert_eq!(change[0].user_id, t.bob_user_id());
assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
assert_eq!(change.len(), 1);
Expand All @@ -412,7 +413,8 @@ mod tests {
t.verify_bob().await;

// And we are listening for identity changes
let changes = t.subscribe_to_identity_status_changes().await;
let stream = t.subscribe_to_identity_status_changes().await;
pin_mut!(stream);

// When Bob joins the room
t.bob_joins().await;
Expand All @@ -421,8 +423,7 @@ mod tests {
t.unpin_bob().await;

//// Then we were only notified about the unpin
let mut changes = pin!(changes);
let change = next_change(&mut changes).await;
let change = assert_next_with_timeout!(stream);
assert_eq!(change[0].user_id, t.bob_user_id());
assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
assert_eq!(change.len(), 1);
Expand All @@ -437,15 +438,15 @@ mod tests {
t.pin_bob().await;

// And we are listening for identity changes
let changes = t.subscribe_to_identity_status_changes().await;
let mut changes = pin!(changes);
let stream = t.subscribe_to_identity_status_changes().await;
pin_mut!(stream);

// When Bob joins the room
t.bob_joins().await;

// Then there is no notification
tokio::time::sleep(Duration::from_millis(200)).await;
let change = changes.next().now_or_never();
let change = stream.next().now_or_never();
assert!(change.is_none());
}

Expand All @@ -458,20 +459,20 @@ mod tests {
t.unpin_bob().await;

// And we are listening for identity changes
let changes = t.subscribe_to_identity_status_changes().await;
let mut changes = pin!(changes);
let stream = t.subscribe_to_identity_status_changes().await;
pin_mut!(stream);

// When Bob leaves the room
t.bob_leaves().await;

// Then we were notified about the initial state of the room
let change1 = next_change(&mut changes).await;
let change1 = assert_next_with_timeout!(stream);
assert_eq!(change1[0].user_id, t.bob_user_id());
assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
assert_eq!(change1.len(), 1);

// And we were notified about the change when the user left
let change2 = next_change(&mut changes).await;
let change2 = assert_next_with_timeout!(stream);
// Note: the user left the room, but we see that as them "becoming pinned" i.e.
// "you no longer need to notify about this user".
assert_eq!(change2[0].user_id, t.bob_user_id());
Expand All @@ -488,36 +489,35 @@ mod tests {
t.unpin_bob().await;

// And we are listening for identity changes
let changes = t.subscribe_to_identity_status_changes().await;
let mut changes = pin!(changes);
let stream = t.subscribe_to_identity_status_changes().await;
pin_mut!(stream);

// NOTE: below we pull the changes out of the subscription after each action.
// This makes sure that the identity changes and membership changes are
// properly ordered. If we pull them out later, the identity changes get
// shifted forward because they rely on less-complex async stuff under
// the hood. Calling next_change ends up winding the async
// machinery sufficiently that the membership change and any subsequent events
// have fully completed.
// This makes sure that the identity changes and membership changes are properly
// ordered. If we pull them out later, the identity changes get shifted forward
// because they rely on less-complex async stuff under the hood. Calling
// next_change ends up winding the async machinery sufficiently that the
// membership change and any subsequent events have fully completed.

// When Bob joins the room ...
t.bob_joins().await;
let change1 = next_change(&mut changes).await;
let change1 = assert_next_with_timeout!(stream);

// ... becomes pinned ...
t.pin_bob().await;
let change2 = next_change(&mut changes).await;
let change2 = assert_next_with_timeout!(stream);

// ... leaves and joins again (ignored since they stay pinned) ...
t.bob_leaves().await;
t.bob_joins().await;

// ... becomes unpinned ...
t.unpin_bob().await;
let change3 = next_change(&mut changes).await;
let change3 = assert_next_with_timeout!(stream);

// ... and leaves.
t.bob_leaves().await;
let change4 = next_change(&mut changes).await;
let change4 = assert_next_with_timeout!(stream);

assert_eq!(change1[0].user_id, t.bob_user_id());
assert_eq!(change2[0].user_id, t.bob_user_id());
Expand All @@ -542,10 +542,11 @@ mod tests {
t.unpin_bob().await;

// When we start listening for identity changes
let changes = t.subscribe_to_identity_status_changes().await;
let stream = t.subscribe_to_identity_status_changes().await;
pin_mut!(stream);

// Then we were immediately notified about Bob being unpinned
let change = next_change(&mut pin!(changes)).await;
let change = assert_next_with_timeout!(stream);
assert_eq!(change[0].user_id, t.bob_user_id());
assert_eq!(change[0].changed_to, IdentityState::PinViolation);
assert_eq!(change.len(), 1);
Expand All @@ -558,34 +559,26 @@ mod tests {
t.verify_bob().await;

// When we start listening for identity changes
let changes = t.subscribe_to_identity_status_changes().await;
let stream = t.subscribe_to_identity_status_changes().await;
pin_mut!(stream);

// (And we unpin so that something is available in the changes stream)
t.unpin_bob().await;

// Then we were only notified about the unpin, not being verified
let change = next_change(&mut pin!(changes)).await;
assert_eq!(change[0].user_id, t.bob_user_id());
assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
assert_eq!(change.len(), 1);
let next_change = assert_next_with_timeout!(stream);

assert_eq!(next_change[0].user_id, t.bob_user_id());
assert_eq!(next_change[0].changed_to, IdentityState::VerificationViolation);
assert_eq!(next_change.len(), 1);
}

// TODO: I (andyb) haven't figured out how to test room membership changes that
// affect our own user (they should not be shown). Specifically, I haven't
// figure out how to get out own user into a non-pinned state.

async fn next_change(
changes: &mut Pin<&mut Timeout<impl Stream<Item = Vec<IdentityStatusChange>>>>,
) -> Vec<IdentityStatusChange> {
changes
.next()
.await
.expect("Should not reach end of changes stream")
.expect("Should not time out waiting for a change")
}

mod test_setup {
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::time::{SystemTime, UNIX_EPOCH};

use futures_core::Stream;
use matrix_sdk_base::{
Expand All @@ -605,7 +598,6 @@ mod tests {
owned_user_id, OwnedUserId, TransactionId, UserId,
};
use serde_json::json;
use tokio_stream::{StreamExt as _, Timeout};
use wiremock::{
matchers::{header, method, path_regex},
Mock, MockServer, ResponseTemplate,
Expand Down Expand Up @@ -786,12 +778,11 @@ mod tests {

pub(super) async fn subscribe_to_identity_status_changes(
&self,
) -> Timeout<impl Stream<Item = Vec<IdentityStatusChange>>> {
) -> impl Stream<Item = Vec<IdentityStatusChange>> {
self.room
.subscribe_to_identity_status_changes()
.await
.expect("Should be able to subscribe")
.timeout(Duration::from_secs(5))
}

async fn init() -> (Client, OwnedUserId, SyncResponseBuilder) {
Expand Down

0 comments on commit 8d27b0c

Please sign in to comment.