Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use SledStore to store full timeline #288

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 76 additions & 2 deletions crates/matrix-sdk-base/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::{
use matrix_sdk_common::{
deserialized_responses::{
AmbiguityChanges, JoinedRoom, LeftRoom, MemberEvent, MembersResponse, Rooms,
StrippedMemberEvent, SyncResponse, SyncRoomEvent, Timeline,
StrippedMemberEvent, SyncResponse, SyncRoomEvent, Timeline, TimelineSlice,
},
instant::Instant,
locks::RwLock,
Expand All @@ -50,7 +50,11 @@ use ruma::{
DeviceId,
};
use ruma::{
api::client::r0::{self as api, push::get_notifications::Notification},
api::client::r0::{
self as api,
message::get_message_events::{Direction, Response as GetMessageEventsResponse},
push::get_notifications::Notification,
},
events::{
room::member::MembershipState, AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent,
AnyStrippedStateEvent, AnySyncEphemeralRoomEvent, AnySyncRoomEvent, AnySyncStateEvent,
Expand Down Expand Up @@ -773,6 +777,15 @@ impl BaseClient {
let notification_count = new_info.unread_notifications.into();
room_info.update_notification_count(notification_count);

changes.add_timeline(
&room_id,
TimelineSlice::new(
timeline.events.iter().cloned().rev().collect(),
next_batch.clone(),
timeline.prev_batch.clone(),
),
);

new_rooms.join.insert(
room_id,
JoinedRoom::new(
Expand Down Expand Up @@ -816,6 +829,14 @@ impl BaseClient {
self.handle_room_account_data(&room_id, &new_info.account_data.events, &mut changes)
.await;

changes.add_timeline(
&room_id,
TimelineSlice::new(
timeline.events.iter().cloned().rev().collect(),
next_batch.clone(),
timeline.prev_batch.clone(),
),
);
changes.add_room(room_info);
new_rooms
.leave
Expand Down Expand Up @@ -892,6 +913,59 @@ impl BaseClient {
}
}

/// Receive a successful /messages response.
///
/// * `response` - The successful response from /messages.
pub async fn receive_messages(
&self,
room_id: &RoomId,
direction: &Direction,
response: &GetMessageEventsResponse,
) -> Result<Vec<SyncRoomEvent>> {
let mut changes = StateChanges::default();

let mut events: Vec<SyncRoomEvent> = vec![];
for event in &response.chunk {
#[allow(unused_mut)]
let mut event: SyncRoomEvent = event.clone().into();

#[cfg(feature = "encryption")]
match event.event.deserialize() {
Ok(AnySyncRoomEvent::Message(AnySyncMessageEvent::RoomEncrypted(encrypted))) => {
if let Some(olm) = self.olm_machine().await {
if let Ok(decrypted) = olm.decrypt_room_event(&encrypted, room_id).await {
event = decrypted.into();
}
}
}
Ok(_) => {}
Err(error) => {
warn!("Error deserializing event {:?}", error);
}
}

events.push(event);
}

let (chunk, start, end) = match direction {
Direction::Backward => {
(events.clone(), response.start.clone().unwrap(), response.end.clone())
}
Direction::Forward => (
events.iter().rev().cloned().collect(),
response.end.clone().unwrap(),
response.start.clone(),
),
};

let timeline = TimelineSlice::new(chunk, start, end);
changes.add_timeline(room_id, timeline);

self.store().save_changes(&changes).await?;

Ok(events)
}

/// Receive a get member events response and convert it to a deserialized
/// `MembersResponse`
///
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk-base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ pub use client::{BaseClient, BaseClientConfig};
#[cfg(feature = "encryption")]
pub use matrix_sdk_crypto as crypto;
pub use rooms::{Room, RoomInfo, RoomMember, RoomType};
pub use store::{StateChanges, StateStore, Store, StoreError};
pub use store::{StateChanges, StateStore, Store, StoreError, StoredTimelineSlice};
37 changes: 36 additions & 1 deletion crates/matrix-sdk-base/src/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use dashmap::{DashMap, DashSet};
use lru::LruCache;
use matrix_sdk_common::{async_trait, instant::Instant, locks::Mutex};
use ruma::{
api::client::r0::message::get_message_events::Direction,
events::{
presence::PresenceEvent,
receipt::Receipt,
Expand All @@ -34,7 +35,7 @@ use ruma::{
};
use tracing::info;

use super::{Result, RoomInfo, StateChanges, StateStore};
use super::{Result, RoomInfo, StateChanges, StateStore, StoredTimelineSlice};
use crate::{
deserialized_responses::{MemberEvent, StrippedMemberEvent},
media::{MediaRequest, UniqueKey},
Expand Down Expand Up @@ -272,6 +273,8 @@ impl MemoryStore {
}
}

// TODO: implement writing timeline to the store.

info!("Saved changes in {:?}", now.elapsed());

Ok(())
Expand Down Expand Up @@ -438,6 +441,23 @@ impl MemoryStore {

Ok(())
}

async fn get_timeline(
&self,
_room_id: &RoomId,
_start: Option<&EventId>,
_end: Option<&EventId>,
_limit: Option<usize>,
_direction: Direction,
) -> Result<Option<StoredTimelineSlice>> {
// TODO: implement reading from the store.
Ok(None)
}

async fn remove_timeline(&self, _room_id: Option<&RoomId>) -> Result<()> {
// TODO: implement once writing the timeline to the store is implemented.
Ok(())
}
}

#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
Expand Down Expand Up @@ -584,6 +604,21 @@ impl StateStore for MemoryStore {
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
self.remove_media_content_for_uri(uri).await
}

async fn get_timeline(
&self,
room_id: &RoomId,
start: Option<&EventId>,
end: Option<&EventId>,
limit: Option<usize>,
direction: Direction,
) -> Result<Option<StoredTimelineSlice>> {
self.get_timeline(room_id, start, end, limit, direction).await
}

async fn remove_timeline(&self, room_id: Option<&RoomId>) -> Result<()> {
self.remove_timeline(room_id).await
}
}

#[cfg(test)]
Expand Down
63 changes: 61 additions & 2 deletions crates/matrix-sdk-base/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use std::{
use dashmap::DashMap;
use matrix_sdk_common::{async_trait, locks::RwLock, AsyncTraitDeps};
use ruma::{
api::client::r0::push::get_notifications::Notification,
api::client::r0::{
message::get_message_events::Direction, push::get_notifications::Notification,
},
events::{
presence::PresenceEvent,
receipt::{Receipt, ReceiptEventContent},
Expand All @@ -39,7 +41,7 @@ use ruma::{
use sled::Db;

use crate::{
deserialized_responses::{MemberEvent, StrippedMemberEvent},
deserialized_responses::{MemberEvent, StrippedMemberEvent, SyncRoomEvent, TimelineSlice},
media::MediaRequest,
rooms::{RoomInfo, RoomType},
Room, Session,
Expand Down Expand Up @@ -313,6 +315,37 @@ pub trait StateStore: AsyncTraitDeps {
///
/// * `uri` - The `MxcUri` of the media files.
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()>;

/// Get a slice of the timeline of a room.
///
/// # Arguments
///
/// * `room_id` - The id of the room for which the timeline should be
/// fetched.
///
/// * `start` - The start point from which events should be returned.
///
/// * `end` - The end point to which events should be returned.
///
/// * `limit` - The maximum number of events to return.
///
/// * `direction` - The direction events should be returned.
async fn get_timeline(
&self,
room_id: &RoomId,
start: Option<&EventId>,
end: Option<&EventId>,
limit: Option<usize>,
direction: Direction,
) -> Result<Option<StoredTimelineSlice>>;

/// Remove the stored timeline.
///
/// # Arguments
///
/// * `room_id` - The id of the room for which the timeline should be
/// removed. If `None` the timeline for every stored room is removed.
async fn remove_timeline(&self, room_id: Option<&RoomId>) -> Result<()>;
}

/// A state store wrapper for the SDK.
Expand Down Expand Up @@ -486,6 +519,8 @@ pub struct StateChanges {
pub ambiguity_maps: BTreeMap<Box<RoomId>, BTreeMap<String, BTreeSet<Box<UserId>>>>,
/// A map of `RoomId` to a vector of `Notification`s
pub notifications: BTreeMap<Box<RoomId>, Vec<Notification>>,
/// A mapping of `RoomId` to a `TimelineSlice`
pub timeline: BTreeMap<Box<RoomId>, TimelineSlice>,
}

impl StateChanges {
Expand Down Expand Up @@ -570,4 +605,28 @@ impl StateChanges {
pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) {
self.receipts.insert(room_id.to_owned(), event);
}

/// Update the `StateChanges` struct with the given room with a new
/// `TimelineSlice`.
pub fn add_timeline(&mut self, room_id: &RoomId, timeline: TimelineSlice) {
self.timeline.insert(room_id.to_owned(), timeline);
}
}

/// A slice of the timeline obtained from the store.
#[derive(Debug, Default)]
pub struct StoredTimelineSlice {
/// A start token to fetch more events if the requested slice isn't fully
/// known.
pub token: Option<String>,

/// The requested events
pub events: Vec<SyncRoomEvent>,
}

#[cfg(feature = "sled_state_store")]
impl StoredTimelineSlice {
pub(crate) fn new(events: Vec<SyncRoomEvent>, token: Option<String>) -> Self {
Self { token, events }
}
}
Loading