From 3bc232061a69c6a90340e96159d22b84f178297d Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Tue, 18 Feb 2025 16:47:54 +0100 Subject: [PATCH 01/15] move `RedapServers` to `AppState` and make it persistent (if empty) --- Cargo.lock | 1 + crates/viewer/re_redap_browser/Cargo.toml | 1 + crates/viewer/re_redap_browser/src/servers.rs | 10 +++++++--- crates/viewer/re_viewer/src/app.rs | 15 ++++++--------- crates/viewer/re_viewer/src/app_state.rs | 6 +++++- 5 files changed, 20 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4800a382af05..705a4ea891e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6452,6 +6452,7 @@ dependencies = [ "re_ui", "re_view_dataframe", "re_viewer_context", + "serde", "thiserror 1.0.65", "tokio-stream", "tonic", diff --git a/crates/viewer/re_redap_browser/Cargo.toml b/crates/viewer/re_redap_browser/Cargo.toml index 0a63937b991e..a0bdc283a5c4 100644 --- a/crates/viewer/re_redap_browser/Cargo.toml +++ b/crates/viewer/re_redap_browser/Cargo.toml @@ -40,6 +40,7 @@ egui.workspace = true egui_table.workspace = true once_cell.workspace = true parking_lot.workspace = true +serde.workspace = true thiserror.workspace = true tokio-stream.workspace = true diff --git a/crates/viewer/re_redap_browser/src/servers.rs b/crates/viewer/re_redap_browser/src/servers.rs index 2a06ad9f2581..c82a424fb1fa 100644 --- a/crates/viewer/re_redap_browser/src/servers.rs +++ b/crates/viewer/re_redap_browser/src/servers.rs @@ -39,15 +39,19 @@ pub struct RecordingCollection { /// All catalogs known to the viewer. // TODO(andreas,antoine): Eventually, collections are part of a catalog, meaning there is going to be multiple ones. -#[derive(Default)] +#[derive(Default, serde::Serialize, serde::Deserialize)] pub struct RedapServers { + //TODO: server + //servers: Vec, + // TODO(andreas,antoine): One of those Urls is probably going to be a local catalog. + #[serde(skip)] catalogs: Arc>>, - // TODO(andreas,antoine): Keep track of in-flight requests. - //in_flight_requests: HashMap>>, + #[serde(skip)] selected_collection: Option, + #[serde(skip)] command_queue: Arc>>, } diff --git a/crates/viewer/re_viewer/src/app.rs b/crates/viewer/re_viewer/src/app.rs index f21703a8d6d4..0595030e8529 100644 --- a/crates/viewer/re_viewer/src/app.rs +++ b/crates/viewer/re_viewer/src/app.rs @@ -7,7 +7,6 @@ use re_capabilities::MainThreadToken; use re_data_source::{DataSource, FileContents}; use re_entity_db::entity_db::EntityDb; use re_log_types::{ApplicationId, FileSource, LogMsg, StoreKind}; -use re_redap_browser::RedapServers; use re_renderer::WgpuResourcePoolStatistics; use re_smart_channel::{ReceiveSet, SmartChannelSource}; use re_ui::{notifications, DesignTokens, UICommand, UICommandSender}; @@ -205,9 +204,6 @@ pub struct App { /// Interface for all recordings and blueprints pub(crate) store_hub: Option, - /// Redap server catalogs and browser UI - redap_servers: RedapServers, - /// Notification panel. pub(crate) notifications: notifications::NotificationUi, @@ -422,7 +418,6 @@ impl App { blueprint_loader(), &crate::app_blueprint::setup_welcome_screen_blueprint, )), - redap_servers: RedapServers::default(), notifications: notifications::NotificationUi::new(), memory_panel: Default::default(), @@ -590,7 +585,9 @@ impl App { match data_source.stream(Some(waker)) { Ok(re_data_source::StreamSource::LogMessages(rx)) => self.add_receiver(rx), Ok(re_data_source::StreamSource::CatalogData { origin: url }) => { - self.redap_servers.fetch_catalog(&self.async_runtime, url); + self.state + .redap_servers + .fetch_catalog(&self.async_runtime, url); } Err(err) => { re_log::error!("Failed to open data source: {}", re_error::format(err)); @@ -1200,7 +1197,7 @@ impl App { #[cfg(not(target_arch = "wasm32"))] let is_history_enabled = false; - self.redap_servers.on_frame_start(); + self.state.redap_servers.on_frame_start(); render_ctx.begin_frame(); self.state.show( @@ -1209,7 +1206,6 @@ impl App { render_ctx, entity_db, store_context, - &self.redap_servers, &self.reflection, &self.component_ui_registry, &self.view_class_registry, @@ -1736,7 +1732,8 @@ impl App { } pub fn fetch_catalog(&self, origin: re_grpc_client::redap::Origin) { - self.redap_servers + self.state + .redap_servers .fetch_catalog(&self.async_runtime, origin); } } diff --git a/crates/viewer/re_viewer/src/app_state.rs b/crates/viewer/re_viewer/src/app_state.rs index dade0c4795b6..fd3d41940b2a 100644 --- a/crates/viewer/re_viewer/src/app_state.rs +++ b/crates/viewer/re_viewer/src/app_state.rs @@ -63,6 +63,9 @@ pub struct AppState { #[serde(skip)] datastore_ui: re_chunk_store_ui::DatastoreUi, + /// Redap server catalogs and browser UI + pub(crate) redap_servers: RedapServers, + /// The current display mode. #[serde(skip)] pub(crate) display_mode: DisplayMode, @@ -105,6 +108,7 @@ impl Default for AppState { blueprint_tree: Default::default(), welcome_screen: Default::default(), datastore_ui: Default::default(), + redap_servers: Default::default(), display_mode: DisplayMode::LocalRecordings, show_settings_ui: false, view_states: Default::default(), @@ -159,7 +163,6 @@ impl AppState { render_ctx: &re_renderer::RenderContext, recording: &EntityDb, store_context: &StoreContext<'_>, - redap_servers: &RedapServers, reflection: &re_types_core::reflection::Reflection, component_ui_registry: &ComponentUiRegistry, view_class_registry: &ViewClassRegistry, @@ -184,6 +187,7 @@ impl AppState { blueprint_tree, welcome_screen, datastore_ui, + redap_servers, display_mode, show_settings_ui, view_states, From 460d9820f82b88f79a550e79f88126d6cf4479cd Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Wed, 19 Feb 2025 16:32:28 +0100 Subject: [PATCH 02/15] Refactored collection query code + add/remove server icons + add server modal --- Cargo.lock | 1 + crates/store/re_grpc_client/Cargo.toml | 3 +- .../store/re_grpc_client/src/redap/address.rs | 11 +- crates/top/rerun/src/commands/entrypoint.rs | 2 +- .../re_redap_browser/src/add_server_modal.rs | 60 +++ .../re_redap_browser/src/collection_ui.rs | 7 +- .../re_redap_browser/src/collections.rs | 177 ++++++++ crates/viewer/re_redap_browser/src/context.rs | 13 + crates/viewer/re_redap_browser/src/lib.rs | 3 + crates/viewer/re_redap_browser/src/servers.rs | 393 +++++++++--------- crates/viewer/re_viewer/src/app.rs | 12 +- 11 files changed, 468 insertions(+), 214 deletions(-) create mode 100644 crates/viewer/re_redap_browser/src/add_server_modal.rs create mode 100644 crates/viewer/re_redap_browser/src/collections.rs create mode 100644 crates/viewer/re_redap_browser/src/context.rs diff --git a/Cargo.lock b/Cargo.lock index 705a4ea891e9..f4055a9f3f00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6171,6 +6171,7 @@ dependencies = [ "re_protos", "re_smart_channel", "re_sorbet", + "serde", "thiserror 1.0.65", "tokio", "tokio-stream", diff --git a/crates/store/re_grpc_client/Cargo.toml b/crates/store/re_grpc_client/Cargo.toml index fc1defaa7a25..9ef844a6c888 100644 --- a/crates/store/re_grpc_client/Cargo.toml +++ b/crates/store/re_grpc_client/Cargo.toml @@ -31,9 +31,10 @@ re_sorbet.workspace = true arrow.workspace = true async-stream.workspace = true +serde.workspace = true thiserror.workspace = true tokio-stream.workspace = true -url.workspace = true +url = { workspace = true, features = ["serde"] } # Native dependencies: [target.'cfg(not(target_arch = "wasm32"))'.dependencies] diff --git a/crates/store/re_grpc_client/src/redap/address.rs b/crates/store/re_grpc_client/src/redap/address.rs index 944e9642efe2..35a811182877 100644 --- a/crates/store/re_grpc_client/src/redap/address.rs +++ b/crates/store/re_grpc_client/src/redap/address.rs @@ -4,9 +4,10 @@ //! `rerun://`, which is an alias for `rerun+https://`. These schemes are then //! converted on the fly to either `http://` or `https://`. -use re_protos::remote_store::v0::storage_node_client::StorageNodeClient; use std::net::Ipv4Addr; +use re_protos::remote_store::v0::storage_node_client::StorageNodeClient; + #[derive(thiserror::Error, Debug)] pub enum ConnectionError { /// Native connection error @@ -39,7 +40,9 @@ pub enum ConnectionError { /// The different schemes supported by Rerun. /// /// We support `rerun`, `rerun+http`, and `rerun+https`. -#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)] +#[derive( + Debug, PartialEq, Eq, Copy, Clone, Hash, PartialOrd, Ord, serde::Serialize, serde::Deserialize, +)] pub enum Scheme { Rerun, RerunHttp, @@ -66,7 +69,9 @@ impl Scheme { } } -#[derive(Debug, PartialEq, Eq, Clone, Hash)] +#[derive( + Debug, PartialEq, Eq, Clone, Hash, PartialOrd, Ord, serde::Serialize, serde::Deserialize, +)] pub struct Origin { scheme: Scheme, host: url::Host, diff --git a/crates/top/rerun/src/commands/entrypoint.rs b/crates/top/rerun/src/commands/entrypoint.rs index a89fd5bfc8c7..8723dfe8a1e5 100644 --- a/crates/top/rerun/src/commands/entrypoint.rs +++ b/crates/top/rerun/src/commands/entrypoint.rs @@ -903,7 +903,7 @@ fn run_impl( app.set_examples_manifest_url(url); } for catalog in catalog_origins { - app.fetch_catalog(catalog); + app.add_redap_server(catalog); } Box::new(app) }), diff --git a/crates/viewer/re_redap_browser/src/add_server_modal.rs b/crates/viewer/re_redap_browser/src/add_server_modal.rs new file mode 100644 index 000000000000..15c8d364af7e --- /dev/null +++ b/crates/viewer/re_redap_browser/src/add_server_modal.rs @@ -0,0 +1,60 @@ +use crate::context::Context; +use crate::servers::Command; +use re_grpc_client::redap; +use re_ui::modal::{ModalHandler, ModalWrapper}; +use re_ui::UiExt; + +#[derive(Default)] +pub struct AddServerModal { + modal: ModalHandler, + url: String, +} + +impl AddServerModal { + pub fn open(&mut self) { + self.url = "rerun://".to_owned(); + self.modal.open(); + } + + //TODO(ab): make that UI a form with a scheme popup, a host text field, and a pre-filled port field + //TODO(ab): handle ESC and return + pub fn ui(&mut self, ctx: &Context<'_>, ui: &egui::Ui) { + self.modal.ui( + ui.ctx(), + || ModalWrapper::new("Add Server"), + |ui, keep_open| { + ui.label("URL:"); + ui.add(egui::TextEdit::singleline(&mut self.url).lock_focus(false)); + + let origin = redap::Origin::try_from(self.url.as_ref()); + + match &origin { + Ok(_) => { + ui.success_label("URL is valid"); + } + Err(err) => { + ui.error_label(format!("Unable to parse server URL: {err}")); + } + } + + ui.add_space(24.0); + + ui.with_layout(egui::Layout::right_to_left(egui::Align::Center), |ui| { + if let Ok(origin) = origin { + if ui.button("Add").clicked() { + *keep_open = false; + + let _ = ctx.command_sender.send(Command::AddServer(origin)); + } + } else { + ui.add_enabled(false, egui::Button::new("Add")); + } + + if ui.button("Cancel").clicked() { + *keep_open = false; + } + }); + }, + ); + } +} diff --git a/crates/viewer/re_redap_browser/src/collection_ui.rs b/crates/viewer/re_redap_browser/src/collection_ui.rs index b154f97f6544..4e9975df4076 100644 --- a/crates/viewer/re_redap_browser/src/collection_ui.rs +++ b/crates/viewer/re_redap_browser/src/collection_ui.rs @@ -15,7 +15,8 @@ use re_ui::UiExt as _; use re_view_dataframe::display_record_batch::{DisplayRecordBatch, DisplayRecordBatchError}; use re_viewer_context::ViewerContext; -use super::servers::{Command, RecordingCollection}; +use super::servers::Command; +use crate::collections::Collection; #[derive(thiserror::Error, Debug)] enum CollectionUiError { @@ -30,7 +31,7 @@ pub fn collection_ui( ctx: &ViewerContext<'_>, ui: &mut egui::Ui, origin: &redap::Origin, - collection: &RecordingCollection, + collection: &Collection, ) -> Vec { let mut commands = vec![]; @@ -44,7 +45,7 @@ pub fn collection_ui( }; // The table id mainly drives column widths, along with the id of each column. - let table_id_salt = collection.collection_id.with("__collection_table__"); + let table_id_salt = egui::Id::new(collection.collection_id).with("__collection_table__"); let num_rows = collection .collection diff --git a/crates/viewer/re_redap_browser/src/collections.rs b/crates/viewer/re_redap_browser/src/collections.rs new file mode 100644 index 000000000000..6fbc1f974372 --- /dev/null +++ b/crates/viewer/re_redap_browser/src/collections.rs @@ -0,0 +1,177 @@ +use std::sync::Arc; + +use ahash::HashMap; +use parking_lot::Mutex; +use tokio_stream::StreamExt as _; + +use re_grpc_client::{redap, StreamError, TonicStatusError}; +use re_log_encoding::codec::wire::decoder::Decode as _; +use re_protos::remote_store::v0::QueryCatalogRequest; +use re_sorbet::{BatchType, SorbetBatch}; +use re_ui::{list_item, UiExt}; +use re_viewer_context::AsyncRuntimeHandle; + +use crate::context::Context; +use crate::servers::Command; + +/// An id for a [`Collection`]. +/// //TODO(ab): this should be a properly defined id provided by the redap server +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub struct CollectionId(pub egui::Id); + +/// An individual collection of recordings within a catalog. +pub struct Collection { + pub collection_id: CollectionId, + + pub name: String, + + pub collection: Vec, +} + +/// A handle on an in-flight collection query. Contains `Some(Ok(_))` or `Some(Err(_))` once the +/// query has completed. +struct CollectionQueryHandle { + result: Arc>>>, +} + +impl CollectionQueryHandle { + /// Initiate a collection query call. + pub fn new(runtime: &AsyncRuntimeHandle, origin: redap::Origin) -> Self { + let result = Arc::new(Mutex::new(None)); + let handle = Self { + result: result.clone(), + }; + + runtime.spawn_future(async move { + let collection = stream_catalog_async(origin.clone()).await; + result.lock().replace(collection); + }); + + handle + } +} + +/// Either a [`Collection`] or a handle on the query to get it. +enum CollectionOrQueryHandle { + QueryHandle(CollectionQueryHandle), + Collection(Result), +} + +/// A collection of [`Collection`]s. +#[derive(Default)] +pub struct Collections { + collections: HashMap, +} + +impl Collections { + pub fn add(&mut self, runtime: &AsyncRuntimeHandle, origin: redap::Origin) { + //TODO(ab): should we return error if the requested collection already exists? Or maybe just + // query it again. + self.collections.entry(origin.clone()).or_insert_with(|| { + CollectionOrQueryHandle::QueryHandle(CollectionQueryHandle::new(runtime, origin)) + }); + } + + /// Convert all completed queries into proper collections. + pub fn on_frame_start(&mut self) { + for collection in self.collections.values_mut() { + let result = match collection { + CollectionOrQueryHandle::QueryHandle(handle) => handle.result.lock().take(), + CollectionOrQueryHandle::Collection(_) => None, + }; + + if let Some(result) = result { + *collection = CollectionOrQueryHandle::Collection(result); + } + } + } + + /// Find a [`Collection`] with the given [`CollectionId`]. + pub fn find(&self, collection_id: CollectionId) -> Option<&Collection> { + self.collections + .values() + .filter_map(|handle| match handle { + CollectionOrQueryHandle::QueryHandle(_) => None, + CollectionOrQueryHandle::Collection(collection) => collection.as_ref().ok(), + }) + .find(|collection| collection.collection_id == collection_id) + } + + /// [`list_item::ListItem`]-based UI for the collections. + pub fn panel_ui(&self, ctx: &Context<'_>, ui: &mut egui::Ui) { + for collection in self.collections.values() { + match collection { + CollectionOrQueryHandle::QueryHandle(_) => { + ui.list_item_flat_noninteractive( + list_item::LabelContent::new("Loading default collection…").italics(true), + ); + } + CollectionOrQueryHandle::Collection(Ok(collection)) => { + let is_selected = *ctx.selected_collection == Some(collection.collection_id); + + let content = list_item::LabelContent::new(&collection.name); + let response = ui.list_item().selected(is_selected).show_flat(ui, content); + + if response.clicked() { + let _ = ctx + .command_sender + .send(Command::SelectCollection(collection.collection_id)); + } + } + CollectionOrQueryHandle::Collection(Err(err)) => { + ui.list_item_flat_noninteractive(list_item::LabelContent::new( + egui::RichText::new("Failed to load").color(ui.visuals().error_fg_color), + )) + .on_hover_text(err.to_string()); + } + } + } + } +} + +async fn stream_catalog_async(origin: redap::Origin) -> Result { + let mut client = origin.client().await?; + + re_log::debug!("Fetching collection…"); + + let catalog_query_response = client + .query_catalog(QueryCatalogRequest { + column_projection: None, // fetch all columns + filter: None, // fetch all rows + }) + .await + .map_err(TonicStatusError)?; + + let sorbet_batches = catalog_query_response + .into_inner() + .map(|streaming_result| { + streaming_result + .and_then(|result| { + result + .decode() + .map_err(|err| tonic::Status::internal(err.to_string())) + }) + .map_err(TonicStatusError) + .map_err(StreamError::from) + }) + .map(|record_batch| { + record_batch.and_then(|record_batch| { + SorbetBatch::try_from_record_batch(&record_batch, BatchType::Dataframe) + .map_err(Into::into) + }) + }) + .collect::, _>>() + .await?; + + //TODO(ab): ideally this is provided by the server + let collection_id = + CollectionId(egui::Id::new(origin.clone()).with("__top_level_collection__")); + let collection = Collection { + collection_id, + //TODO(ab): this should be provided by the server + name: "default".to_owned(), + collection: sorbet_batches, + }; + + Ok(collection) +} diff --git a/crates/viewer/re_redap_browser/src/context.rs b/crates/viewer/re_redap_browser/src/context.rs new file mode 100644 index 000000000000..346338516795 --- /dev/null +++ b/crates/viewer/re_redap_browser/src/context.rs @@ -0,0 +1,13 @@ +use std::sync::mpsc::Sender; + +use crate::collections::CollectionId; +use crate::servers::Command; + +/// Context structure for the redap browser. +pub struct Context<'a> { + /// Sender to queue new commands. + pub command_sender: &'a Sender, + + /// Currently selected collection. + pub selected_collection: &'a Option, +} diff --git a/crates/viewer/re_redap_browser/src/lib.rs b/crates/viewer/re_redap_browser/src/lib.rs index d98c8762e69a..122e37cd120f 100644 --- a/crates/viewer/re_redap_browser/src/lib.rs +++ b/crates/viewer/re_redap_browser/src/lib.rs @@ -1,7 +1,10 @@ //! This crates implements the Redap browser feature, including the communication and UI aspects of //! it. +mod add_server_modal; mod collection_ui; +mod collections; +mod context; mod servers; pub use servers::RedapServers; diff --git a/crates/viewer/re_redap_browser/src/servers.rs b/crates/viewer/re_redap_browser/src/servers.rs index c82a424fb1fa..3f31e71325d8 100644 --- a/crates/viewer/re_redap_browser/src/servers.rs +++ b/crates/viewer/re_redap_browser/src/servers.rs @@ -1,98 +1,206 @@ -use std::sync::Arc; +use std::collections::{BTreeMap, BTreeSet}; +use std::sync::mpsc::{Receiver, Sender}; -use ahash::HashMap; -use parking_lot::Mutex; -use tokio_stream::StreamExt as _; - -use re_grpc_client::{redap, StreamError, TonicStatusError}; -use re_log_encoding::codec::wire::decoder::Decode as _; -use re_protos::remote_store::v0::QueryCatalogRequest; -use re_sorbet::{BatchType, SorbetBatch}; +use re_grpc_client::redap; use re_ui::{list_item, UiExt}; use re_viewer_context::{AsyncRuntimeHandle, ViewerContext}; -//TODO(ab): remove this in favor of an id -pub struct CollectionHandle { - server_origin: redap::Origin, - collection_index: usize, -} +use crate::add_server_modal::AddServerModal; +use crate::collections::{Collection, CollectionId, Collections}; +use crate::context::Context; -/// An individual catalog. -pub struct Catalog { - collections: Vec, +struct Server { + origin: redap::Origin, + + collections: Collections, } -impl Catalog { - fn is_empty(&self) -> bool { - self.collections.is_empty() +impl Server { + fn new(runtime: &AsyncRuntimeHandle, origin: redap::Origin) -> Self { + //let default_catalog = FetchCollectionTask::new(runtime, origin.clone()); + + let mut collections = Collections::default(); + + //TODO(ab): For now, we just auto-download the default collection + collections.add(runtime, origin.clone()); + + Self { + origin, + collections, + } } -} -/// An individual collection of recordings within a catalog. -pub struct RecordingCollection { - pub collection_id: egui::Id, + fn on_frame_start(&mut self) { + self.collections.on_frame_start(); + } - pub name: String, + fn find_collection(&self, collection_id: CollectionId) -> Option<&Collection> { + self.collections.find(collection_id) + } + + fn panel_ui(&self, ctx: &Context<'_>, ui: &mut egui::Ui) { + let content = list_item::LabelContent::new(self.origin.to_string()) + .with_buttons(|ui| { + let response = ui + .small_icon_button(&re_ui::icons::REMOVE) + .on_hover_text("Remove server"); + + if response.clicked() { + let _ = ctx + .command_sender + .send(Command::RemoveServer(self.origin.clone())); + } - pub collection: Vec, + response + }) + .always_show_buttons(true); + + ui.list_item() + .interactive(false) + .show_hierarchical_with_children( + ui, + egui::Id::new(&self.origin).with("server_item"), + true, + content, + |ui| { + self.collections.panel_ui(ctx, ui); + }, + ); + } } -/// All catalogs known to the viewer. -// TODO(andreas,antoine): Eventually, collections are part of a catalog, meaning there is going to be multiple ones. -#[derive(Default, serde::Serialize, serde::Deserialize)] +/// All servers known to the viewer, and their catalog data. pub struct RedapServers { - //TODO: server - //servers: Vec, + /// The list of server. + /// + /// This is the only data persisted. Everything else being recreated on the fly. + server_list: BTreeSet, - // TODO(andreas,antoine): One of those Urls is probably going to be a local catalog. - #[serde(skip)] - catalogs: Arc>>, + /// The actual servers, populated from the server list if needed. + /// + /// Servers in `server_list` are automatically added to `server` by `on_frame_start`. + servers: BTreeMap, + + selected_collection: Option, + + // message queue for commands + command_sender: Sender, + command_receiver: Receiver, + + add_server_modal_ui: AddServerModal, +} - #[serde(skip)] - selected_collection: Option, +impl serde::Serialize for RedapServers { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + self.server_list.serialize(serializer) + } +} - #[serde(skip)] - command_queue: Arc>>, +impl<'de> serde::Deserialize<'de> for RedapServers { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let server_list = BTreeSet::::deserialize(deserializer)?; + let (command_sender, command_receiver) = std::sync::mpsc::channel(); + Ok(Self { + server_list, + servers: BTreeMap::new(), + selected_collection: None, + command_sender, + command_receiver, + add_server_modal_ui: Default::default(), + }) + } +} + +impl Default for RedapServers { + fn default() -> Self { + let (command_sender, command_receiver) = std::sync::mpsc::channel(); + + Self { + server_list: Default::default(), + servers: Default::default(), + selected_collection: None, + command_sender, + command_receiver, + add_server_modal_ui: Default::default(), + } + } } pub enum Command { - SelectCollection(CollectionHandle), + SelectCollection(CollectionId), DeselectCollection, + AddServer(redap::Origin), + RemoveServer(redap::Origin), } impl RedapServers { - /// Asynchronously fetches a catalog from a URL and adds it to the hub. + /// Add a server to the hub. + pub fn add_server(&mut self, origin: redap::Origin) { + self.server_list.insert(origin); + } + + /// Remove a server from the hub. + pub fn remove_server(&mut self, origin: &redap::Origin) { + self.server_list.remove(origin); + self.servers.remove(origin); + } + + /// Per-frame housekeeping. /// - /// If this url was used before, it will refresh the existing catalog in the hub. - pub fn fetch_catalog(&self, runtime: &AsyncRuntimeHandle, origin: redap::Origin) { - let catalogs = self.catalogs.clone(); - runtime.spawn_future(async move { - let result = stream_catalog_async(origin, catalogs).await; - if let Err(err) = result { - // TODO(andreas,ab): Surface this in the UI in a better way. - re_log::error!("Failed to fetch catalog: {err}"); + /// - Process [`Command`]s from the queue. + /// - Load servers from `server_list`. + /// - Update all servers. + pub fn on_frame_start(&mut self, runtime: &AsyncRuntimeHandle) { + while let Ok(command) = self.command_receiver.try_recv() { + self.handle_command(command); + } + + for origin in &self.server_list { + if !self.servers.contains_key(origin) { + self.servers + .insert(origin.clone(), Server::new(runtime, origin.clone())); } - }); - } + } - /// Process any pending commands - pub fn on_frame_start(&mut self) { - for command in self.command_queue.lock().drain(..) { - match command { - Command::SelectCollection(collection_handle) => { - self.selected_collection = Some(collection_handle); - } + for server in self.servers.values_mut() { + server.on_frame_start(); + } + } - Command::DeselectCollection => self.selected_collection = None, + fn handle_command(&mut self, command: Command) { + match command { + Command::SelectCollection(collection_handle) => { + self.selected_collection = Some(collection_handle); } + + Command::DeselectCollection => self.selected_collection = None, + + Command::AddServer(origin) => self.add_server(origin), + + Command::RemoveServer(origin) => self.remove_server(&origin), } } - pub fn server_panel_ui(&self, ui: &mut egui::Ui) { + pub fn server_panel_ui(&mut self, ui: &mut egui::Ui) { ui.panel_content(|ui| { - ui.panel_title_bar( + ui.panel_title_bar_with_buttons( "Servers", Some("These are the currently connected Redap servers."), + |ui| { + if ui + .small_icon_button(&re_ui::icons::ADD) + .on_hover_text("Add a server") + .clicked() + { + self.add_server_modal_ui.open(); + } + }, ); }); @@ -108,158 +216,47 @@ impl RedapServers { }); } - pub fn is_empty(&self) -> bool { - self.catalogs.lock().is_empty() - } - - pub fn is_collection_selected(&self) -> bool { - self.selected_collection - .as_ref() - .map(|handle| self.validate_handle(handle)) - .unwrap_or(false) - } + fn server_list_ui(&self, ui: &mut egui::Ui) { + let ctx = Context { + command_sender: &self.command_sender, + selected_collection: &self.selected_collection, + }; - fn validate_handle(&self, handle: &CollectionHandle) -> bool { - let catalogs = self.catalogs.lock(); - if let Some(catalog) = catalogs.get(&handle.server_origin) { - return catalog.collections.get(handle.collection_index).is_some(); + for server in self.servers.values() { + server.panel_ui(&ctx, ui); } - - false } - pub fn server_list_ui(&self, ui: &mut egui::Ui) { - for (origin, catalog) in self.catalogs.lock().iter() { - let content = list_item::LabelContent::new(origin.to_string()); - ui.list_item() - .interactive(false) - .show_hierarchical_with_children( - ui, - egui::Id::new(origin).with("server_item"), - true, - content, - |ui| { - self.catalog_list_ui(ui, origin, catalog); - }, - ); - } - } - - fn catalog_list_ui(&self, ui: &mut egui::Ui, origin: &redap::Origin, catalog: &Catalog) { - if catalog.is_empty() { - ui.list_item_flat_noninteractive(list_item::LabelContent::new("(empty)").italics(true)); - } else { - for (index, collection) in catalog.collections.iter().enumerate() { - let is_selected = - if let Some(selected_collection) = self.selected_collection.as_ref() { - selected_collection.server_origin == *origin - && selected_collection.collection_index == index - } else { - false - }; - - let content = list_item::LabelContent::new(&collection.name); - let response = ui.list_item().selected(is_selected).show_flat(ui, content); + pub fn ui(&mut self, ctx: &ViewerContext<'_>, ui: &mut egui::Ui) { + self.add_server_modal_ui(ui); - if response.clicked() { - self.command_queue - .lock() - .push(Command::SelectCollection(CollectionHandle { - server_origin: origin.clone(), - collection_index: index, - })); - } - } - } - - // deselect when clicking in empty space - let empty_space_response = ui.allocate_response(ui.available_size(), egui::Sense::click()); - - // clear selection upon clicking the empty space - if empty_space_response.clicked() { - self.command_queue.lock().push(Command::DeselectCollection); - } - } - - pub fn ui(&self, ctx: &ViewerContext<'_>, ui: &mut egui::Ui) { //TODO(ab): we should display something even if no catalog is currently selected. if let Some(selected_collection) = self.selected_collection.as_ref() { - let catalogs = self.catalogs.lock(); - if let Some(catalog) = catalogs.get(&selected_collection.server_origin) { - if let Some(collection) = catalog - .collections - .get(selected_collection.collection_index) - { - let mut commands = super::collection_ui::collection_ui( - ctx, - ui, - &selected_collection.server_origin, - collection, - ); - if !commands.is_empty() { - self.command_queue.lock().extend(commands.drain(..)); + for server in self.servers.values() { + let collection = server.find_collection(*selected_collection); + + if let Some(collection) = collection { + let mut commands = + super::collection_ui::collection_ui(ctx, ui, &server.origin, collection); + + //TODO: clean that up + for command in commands.drain(..) { + let _ = self.command_sender.send(command); } + + return; } } } } -} -async fn stream_catalog_async( - origin: redap::Origin, - catalogs: Arc>>, -) -> Result<(), StreamError> { - let mut client = origin.client().await?; + fn add_server_modal_ui(&mut self, ui: &egui::Ui) { + let ctx = Context { + command_sender: &self.command_sender, + selected_collection: &self.selected_collection, + }; - re_log::debug!("Fetching collection…"); - - let catalog_query_response = client - .query_catalog(QueryCatalogRequest { - column_projection: None, // fetch all columns - filter: None, // fetch all rows - }) - .await - .map_err(TonicStatusError)?; - - let sorbet_batches = catalog_query_response - .into_inner() - .map(|streaming_result| { - streaming_result - .and_then(|result| { - result - .decode() - .map_err(|err| tonic::Status::internal(err.to_string())) - }) - .map_err(TonicStatusError) - .map_err(StreamError::from) - }) - .map(|record_batch| { - record_batch.and_then(|record_batch| { - SorbetBatch::try_from_record_batch(&record_batch, BatchType::Dataframe) - .map_err(Into::into) - }) - }) - .collect::, _>>() - .await?; - - //TODO(ab): ideally this is provided by the server - let collection_id = egui::Id::new(origin.clone()).with("__top_level_collection__"); - let catalog = Catalog { - collections: vec![RecordingCollection { - collection_id, - //TODO(ab): this should be provided by the server - name: "default".to_owned(), - collection: sorbet_batches, - }], - }; - - let previous_catalog = catalogs.lock().insert(origin.clone(), catalog); - if previous_catalog.is_some() { - re_log::debug!("Updated catalog for {}.", origin.to_string()); - } else { - re_log::debug!("Fetched new catalog for {}.", origin.to_string()); + self.add_server_modal_ui.ui(&ctx, ui); } - - Ok(()) } diff --git a/crates/viewer/re_viewer/src/app.rs b/crates/viewer/re_viewer/src/app.rs index 0595030e8529..9c1c2b2dc07b 100644 --- a/crates/viewer/re_viewer/src/app.rs +++ b/crates/viewer/re_viewer/src/app.rs @@ -585,9 +585,7 @@ impl App { match data_source.stream(Some(waker)) { Ok(re_data_source::StreamSource::LogMessages(rx)) => self.add_receiver(rx), Ok(re_data_source::StreamSource::CatalogData { origin: url }) => { - self.state - .redap_servers - .fetch_catalog(&self.async_runtime, url); + self.state.redap_servers.add_server(url); } Err(err) => { re_log::error!("Failed to open data source: {}", re_error::format(err)); @@ -1197,7 +1195,7 @@ impl App { #[cfg(not(target_arch = "wasm32"))] let is_history_enabled = false; - self.state.redap_servers.on_frame_start(); + self.state.redap_servers.on_frame_start(&self.async_runtime); render_ctx.begin_frame(); self.state.show( @@ -1731,10 +1729,8 @@ impl App { } } - pub fn fetch_catalog(&self, origin: re_grpc_client::redap::Origin) { - self.state - .redap_servers - .fetch_catalog(&self.async_runtime, origin); + pub fn add_redap_server(&mut self, origin: re_grpc_client::redap::Origin) { + self.state.redap_servers.add_server(origin); } } From cba8116a75b59094f6694c3c4f777bf6b0d26382 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Wed, 19 Feb 2025 17:51:06 +0100 Subject: [PATCH 03/15] Post-merge fix + cleanup --- crates/store/re_grpc_client/Cargo.toml | 2 +- .../re_redap_browser/src/collection_ui.rs | 18 +++++++--------- crates/viewer/re_redap_browser/src/servers.rs | 21 ++++++++++++------- crates/viewer/re_viewer/src/app.rs | 3 +-- 4 files changed, 23 insertions(+), 21 deletions(-) diff --git a/crates/store/re_grpc_client/Cargo.toml b/crates/store/re_grpc_client/Cargo.toml index 9ef844a6c888..6887feeabf16 100644 --- a/crates/store/re_grpc_client/Cargo.toml +++ b/crates/store/re_grpc_client/Cargo.toml @@ -34,7 +34,7 @@ async-stream.workspace = true serde.workspace = true thiserror.workspace = true tokio-stream.workspace = true -url = { workspace = true, features = ["serde"] } +url = { workspace = true, features = ["serde"] } # Native dependencies: [target.'cfg(not(target_arch = "wasm32"))'.dependencies] diff --git a/crates/viewer/re_redap_browser/src/collection_ui.rs b/crates/viewer/re_redap_browser/src/collection_ui.rs index 044d587b0576..df8989927d8a 100644 --- a/crates/viewer/re_redap_browser/src/collection_ui.rs +++ b/crates/viewer/re_redap_browser/src/collection_ui.rs @@ -16,6 +16,7 @@ use re_viewer_context::ViewerContext; use super::servers::Command; use crate::collections::Collection; +use crate::context::Context; #[derive(thiserror::Error, Debug)] enum CollectionUiError { @@ -27,17 +28,16 @@ enum CollectionUiError { } pub fn collection_ui( - ctx: &ViewerContext<'_>, + viewer_ctx: &ViewerContext<'_>, + ctx: &Context<'_>, ui: &mut egui::Ui, origin: &redap::Origin, collection: &Collection, -) -> Vec { - let mut commands = vec![]; - +) { let sorbet_schema = { let Some(sorbet_batch) = collection.collection.first() else { ui.label(egui::RichText::new("This collection is empty").italics()); - return commands; + return; }; sorbet_batch.sorbet_schema() @@ -71,19 +71,19 @@ pub fn collection_ui( Err(err) => { //TODO(ab): better error handling? ui.error_label(err.to_string()); - return commands; + return; } }; let mut table_delegate = CollectionTableDelegate { - ctx, + ctx: viewer_ctx, display_record_batches: &display_record_batches, selected_columns: &columns, }; egui::Frame::new().inner_margin(5.0).show(ui, |ui| { if ui.button("Close").clicked() { - commands.push(Command::DeselectCollection); + let _ = ctx.command_sender.send(Command::DeselectCollection); } egui_table::Table::new() @@ -104,8 +104,6 @@ pub fn collection_ui( .num_rows(num_rows) .show(ui, &mut table_delegate); }); - - commands } /// Descriptor for the generated `RecordingUri` component. diff --git a/crates/viewer/re_redap_browser/src/servers.rs b/crates/viewer/re_redap_browser/src/servers.rs index 3f31e71325d8..f0715da53349 100644 --- a/crates/viewer/re_redap_browser/src/servers.rs +++ b/crates/viewer/re_redap_browser/src/servers.rs @@ -227,7 +227,7 @@ impl RedapServers { } } - pub fn ui(&mut self, ctx: &ViewerContext<'_>, ui: &mut egui::Ui) { + pub fn ui(&mut self, viewer_ctx: &ViewerContext<'_>, ui: &mut egui::Ui) { self.add_server_modal_ui(ui); //TODO(ab): we should display something even if no catalog is currently selected. @@ -237,13 +237,18 @@ impl RedapServers { let collection = server.find_collection(*selected_collection); if let Some(collection) = collection { - let mut commands = - super::collection_ui::collection_ui(ctx, ui, &server.origin, collection); - - //TODO: clean that up - for command in commands.drain(..) { - let _ = self.command_sender.send(command); - } + let ctx = Context { + command_sender: &self.command_sender, + selected_collection: &self.selected_collection, + }; + + super::collection_ui::collection_ui( + viewer_ctx, + &ctx, + ui, + &server.origin, + collection, + ); return; } diff --git a/crates/viewer/re_viewer/src/app.rs b/crates/viewer/re_viewer/src/app.rs index 9388b49cbab2..2c3e6904f044 100644 --- a/crates/viewer/re_viewer/src/app.rs +++ b/crates/viewer/re_viewer/src/app.rs @@ -574,8 +574,7 @@ impl App { self.state.display_mode = display_mode; } SystemCommand::AddRedapServer { origin } => { - self.redap_servers - .fetch_catalog(&self.async_runtime, origin); + self.state.redap_servers.add_server(origin); } SystemCommand::LoadDataSource(data_source) => { From 9ca9f077e0c266c9cfb2d854d3457ca7bfac301e Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Wed, 19 Feb 2025 18:00:58 +0100 Subject: [PATCH 04/15] Add `with_ctx` --- crates/viewer/re_redap_browser/src/servers.rs | 45 ++++++++++--------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/crates/viewer/re_redap_browser/src/servers.rs b/crates/viewer/re_redap_browser/src/servers.rs index f0715da53349..f09c45cfab66 100644 --- a/crates/viewer/re_redap_browser/src/servers.rs +++ b/crates/viewer/re_redap_browser/src/servers.rs @@ -217,14 +217,11 @@ impl RedapServers { } fn server_list_ui(&self, ui: &mut egui::Ui) { - let ctx = Context { - command_sender: &self.command_sender, - selected_collection: &self.selected_collection, - }; - - for server in self.servers.values() { - server.panel_ui(&ctx, ui); - } + self.with_ctx(|ctx| { + for server in self.servers.values() { + server.panel_ui(ctx, ui); + } + }); } pub fn ui(&mut self, viewer_ctx: &ViewerContext<'_>, ui: &mut egui::Ui) { @@ -237,18 +234,15 @@ impl RedapServers { let collection = server.find_collection(*selected_collection); if let Some(collection) = collection { - let ctx = Context { - command_sender: &self.command_sender, - selected_collection: &self.selected_collection, - }; - - super::collection_ui::collection_ui( - viewer_ctx, - &ctx, - ui, - &server.origin, - collection, - ); + self.with_ctx(|ctx| { + super::collection_ui::collection_ui( + viewer_ctx, + ctx, + ui, + &server.origin, + collection, + ); + }); return; } @@ -257,6 +251,7 @@ impl RedapServers { } fn add_server_modal_ui(&mut self, ui: &egui::Ui) { + //TODO(ab): borrow checker doesn't let me use `with_ctx()` here, I should find a better way let ctx = Context { command_sender: &self.command_sender, selected_collection: &self.selected_collection, @@ -264,4 +259,14 @@ impl RedapServers { self.add_server_modal_ui.ui(&ctx, ui); } + + #[inline] + fn with_ctx(&self, func: impl FnOnce(&Context<'_>) -> R) -> R { + let ctx = Context { + command_sender: &self.command_sender, + selected_collection: &self.selected_collection, + }; + + func(&ctx) + } } From 8e205df2da389dfd155b41551cd05d17a0b5eff4 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Thu, 20 Feb 2025 12:20:14 +0100 Subject: [PATCH 05/15] Improve add server modal UI --- Cargo.lock | 2 +- crates/store/re_grpc_client/Cargo.toml | 3 +- crates/viewer/re_redap_browser/Cargo.toml | 1 + .../re_redap_browser/src/add_server_modal.rs | 72 +++++++++++++++---- 4 files changed, 60 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a5ad09812482..980666fa1457 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6171,7 +6171,6 @@ dependencies = [ "re_smart_channel", "re_sorbet", "re_uri", - "serde", "thiserror 1.0.65", "tokio", "tokio-stream", @@ -6458,6 +6457,7 @@ dependencies = [ "tokio-stream", "tonic", "tonic-web-wasm-client", + "url", ] [[package]] diff --git a/crates/store/re_grpc_client/Cargo.toml b/crates/store/re_grpc_client/Cargo.toml index b1992fc633cd..18fd9ce8afac 100644 --- a/crates/store/re_grpc_client/Cargo.toml +++ b/crates/store/re_grpc_client/Cargo.toml @@ -32,10 +32,9 @@ re_uri.workspace = true arrow.workspace = true async-stream.workspace = true -serde.workspace = true thiserror.workspace = true tokio-stream.workspace = true -url = { workspace = true, features = ["serde"] } +url.workspace = true # Native dependencies: [target.'cfg(not(target_arch = "wasm32"))'.dependencies] diff --git a/crates/viewer/re_redap_browser/Cargo.toml b/crates/viewer/re_redap_browser/Cargo.toml index cc3b27f9c336..695f726e258f 100644 --- a/crates/viewer/re_redap_browser/Cargo.toml +++ b/crates/viewer/re_redap_browser/Cargo.toml @@ -43,6 +43,7 @@ parking_lot.workspace = true serde.workspace = true thiserror.workspace = true tokio-stream.workspace = true +url.workspace = true # Native dependencies: [target.'cfg(not(target_arch = "wasm32"))'.dependencies] diff --git a/crates/viewer/re_redap_browser/src/add_server_modal.rs b/crates/viewer/re_redap_browser/src/add_server_modal.rs index 24abe72592ac..95db0cff4692 100644 --- a/crates/viewer/re_redap_browser/src/add_server_modal.rs +++ b/crates/viewer/re_redap_browser/src/add_server_modal.rs @@ -1,41 +1,83 @@ use re_ui::modal::{ModalHandler, ModalWrapper}; -use re_ui::UiExt; +use re_uri::Scheme; use crate::context::Context; use crate::servers::Command; -#[derive(Default)] pub struct AddServerModal { modal: ModalHandler, - url: String, + + scheme: Scheme, + host: String, + port: u16, +} + +impl Default for AddServerModal { + fn default() -> Self { + Self { + modal: Default::default(), + scheme: Scheme::Rerun, + host: String::new(), + port: 443, + } + } } impl AddServerModal { pub fn open(&mut self) { - self.url = "rerun://".to_owned(); + self.scheme = Scheme::Rerun; + self.port = 443; + self.host = String::new(); + self.modal.open(); } - //TODO(ab): make that UI a form with a scheme popup, a host text field, and a pre-filled port field //TODO(ab): handle ESC and return pub fn ui(&mut self, ctx: &Context<'_>, ui: &egui::Ui) { self.modal.ui( ui.ctx(), || ModalWrapper::new("Add Server"), |ui, keep_open| { - ui.label("URL:"); - ui.add(egui::TextEdit::singleline(&mut self.url).lock_focus(false)); + ui.label("Scheme:"); - let origin = re_uri::Origin::try_from(self.url.as_ref()); + egui::ComboBox::new("scheme", "") + .selected_text(if self.scheme == Scheme::RerunHttp { + "http" + } else { + "https" + }) + .show_ui(ui, |ui| { + ui.selectable_value(&mut self.scheme, Scheme::RerunHttps, "https"); + ui.selectable_value(&mut self.scheme, Scheme::RerunHttp, "http"); + }); - match &origin { - Ok(_) => { - ui.success_label("URL is valid"); - } - Err(err) => { - ui.error_label(format!("Unable to parse server URL: {err}")); + ui.add_space(14.0); + + ui.label("Host name:"); + let host = url::Host::parse(&self.host); + ui.scope(|ui| { + // make field red if host is invalid + if host.is_err() { + ui.visuals_mut().widgets.active.bg_stroke = + egui::Stroke::new(1.0, ui.visuals().error_fg_color); + ui.visuals_mut().widgets.hovered.bg_stroke = + egui::Stroke::new(1.0, ui.visuals().error_fg_color); + ui.visuals_mut().widgets.inactive.bg_stroke = + egui::Stroke::new(1.0, ui.visuals().error_fg_color); } - } + ui.add(egui::TextEdit::singleline(&mut self.host).lock_focus(false)); + }); + + ui.add_space(14.0); + + ui.label("Port:"); + ui.add(egui::DragValue::new(&mut self.port)); + + let origin = host.map(|host| re_uri::Origin { + scheme: self.scheme, + host, + port: self.port, + }); ui.add_space(24.0); From bc63ca397100c55a021c7b8cf099472e5975d5b0 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Thu, 20 Feb 2025 12:25:33 +0100 Subject: [PATCH 06/15] lint --- crates/viewer/re_redap_browser/src/servers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/viewer/re_redap_browser/src/servers.rs b/crates/viewer/re_redap_browser/src/servers.rs index 72e032300d0f..847315f8d3b3 100644 --- a/crates/viewer/re_redap_browser/src/servers.rs +++ b/crates/viewer/re_redap_browser/src/servers.rs @@ -152,7 +152,7 @@ impl RedapServers { /// Per-frame housekeeping. /// - /// - Process [`Command`]s from the queue. + /// - Process commands from the queue. /// - Load servers from `server_list`. /// - Update all servers. pub fn on_frame_start(&mut self, runtime: &AsyncRuntimeHandle) { From 1ab69dae83dc497d731269fda1b906a920f3894f Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Thu, 20 Feb 2025 15:34:34 +0100 Subject: [PATCH 07/15] Add generic `RequestedObject` --- .../re_redap_browser/src/collections.rs | 65 +++++-------------- crates/viewer/re_redap_browser/src/lib.rs | 1 + .../re_redap_browser/src/requested_object.rs | 50 ++++++++++++++ 3 files changed, 66 insertions(+), 50 deletions(-) create mode 100644 crates/viewer/re_redap_browser/src/requested_object.rs diff --git a/crates/viewer/re_redap_browser/src/collections.rs b/crates/viewer/re_redap_browser/src/collections.rs index 75dda283de65..9acfcbd71496 100644 --- a/crates/viewer/re_redap_browser/src/collections.rs +++ b/crates/viewer/re_redap_browser/src/collections.rs @@ -1,7 +1,4 @@ -use std::sync::Arc; - use ahash::HashMap; -use parking_lot::Mutex; use tokio_stream::StreamExt as _; use re_grpc_client::{redap, StreamError, TonicStatusError}; @@ -12,6 +9,7 @@ use re_ui::{list_item, UiExt}; use re_viewer_context::AsyncRuntimeHandle; use crate::context::Context; +use crate::requested_object::RequestedObject; use crate::servers::Command; /// An id for a [`Collection`]. @@ -28,39 +26,11 @@ pub struct Collection { pub collection: Vec, } -/// A handle on an in-flight collection query. Contains `Some(Ok(_))` or `Some(Err(_))` once the -/// query has completed. -struct CollectionQueryHandle { - result: Arc>>>, -} - -impl CollectionQueryHandle { - /// Initiate a collection query call. - pub fn new(runtime: &AsyncRuntimeHandle, origin: re_uri::Origin) -> Self { - let result = Arc::new(Mutex::new(None)); - let handle = Self { - result: result.clone(), - }; - - runtime.spawn_future(async move { - let collection = stream_catalog_async(origin.clone()).await; - result.lock().replace(collection); - }); - - handle - } -} - -/// Either a [`Collection`] or a handle on the query to get it. -enum CollectionOrQueryHandle { - QueryHandle(CollectionQueryHandle), - Collection(Result), -} - /// A collection of [`Collection`]s. #[derive(Default)] pub struct Collections { - collections: HashMap, + //TODO(ab): these should be indexed by collection id + collections: HashMap>>, } impl Collections { @@ -68,21 +38,16 @@ impl Collections { //TODO(ab): should we return error if the requested collection already exists? Or maybe just // query it again. self.collections.entry(origin.clone()).or_insert_with(|| { - CollectionOrQueryHandle::QueryHandle(CollectionQueryHandle::new(runtime, origin)) + RequestedObject::new(runtime, stream_catalog_async(origin)) + + //CollectionOrQueryHandle::QueryHandle(CollectionQueryHandle::new(runtime, origin)) }); } /// Convert all completed queries into proper collections. pub fn on_frame_start(&mut self) { for collection in self.collections.values_mut() { - let result = match collection { - CollectionOrQueryHandle::QueryHandle(handle) => handle.result.lock().take(), - CollectionOrQueryHandle::Collection(_) => None, - }; - - if let Some(result) = result { - *collection = CollectionOrQueryHandle::Collection(result); - } + collection.on_frame_start(); } } @@ -90,23 +55,22 @@ impl Collections { pub fn find(&self, collection_id: CollectionId) -> Option<&Collection> { self.collections .values() - .filter_map(|handle| match handle { - CollectionOrQueryHandle::QueryHandle(_) => None, - CollectionOrQueryHandle::Collection(collection) => collection.as_ref().ok(), - }) + .filter_map(|handle| handle.try_as_ref()) + .filter_map(|result| result.as_ref().ok()) .find(|collection| collection.collection_id == collection_id) } /// [`list_item::ListItem`]-based UI for the collections. pub fn panel_ui(&self, ctx: &Context<'_>, ui: &mut egui::Ui) { for collection in self.collections.values() { - match collection { - CollectionOrQueryHandle::QueryHandle(_) => { + match collection.try_as_ref() { + None => { ui.list_item_flat_noninteractive( list_item::LabelContent::new("Loading default collection…").italics(true), ); } - CollectionOrQueryHandle::Collection(Ok(collection)) => { + + Some(Ok(collection)) => { let is_selected = *ctx.selected_collection == Some(collection.collection_id); let content = list_item::LabelContent::new(&collection.name); @@ -118,7 +82,8 @@ impl Collections { .send(Command::SelectCollection(collection.collection_id)); } } - CollectionOrQueryHandle::Collection(Err(err)) => { + + Some(Err(err)) => { ui.list_item_flat_noninteractive(list_item::LabelContent::new( egui::RichText::new("Failed to load").color(ui.visuals().error_fg_color), )) diff --git a/crates/viewer/re_redap_browser/src/lib.rs b/crates/viewer/re_redap_browser/src/lib.rs index 122e37cd120f..677d2554bace 100644 --- a/crates/viewer/re_redap_browser/src/lib.rs +++ b/crates/viewer/re_redap_browser/src/lib.rs @@ -5,6 +5,7 @@ mod add_server_modal; mod collection_ui; mod collections; mod context; +mod requested_object; mod servers; pub use servers::RedapServers; diff --git a/crates/viewer/re_redap_browser/src/requested_object.rs b/crates/viewer/re_redap_browser/src/requested_object.rs new file mode 100644 index 000000000000..59f4019fd3e6 --- /dev/null +++ b/crates/viewer/re_redap_browser/src/requested_object.rs @@ -0,0 +1,50 @@ +use std::sync::Arc; + +use parking_lot::Mutex; + +use re_viewer_context::AsyncRuntimeHandle; + +/// A handle to an object that is requested asynchronously. +pub enum RequestedObject { + Pending(Arc>>), + Completed(T), +} + +impl RequestedObject { + /// Create a new [`Self`] with the given future. + pub fn new(runtime: &AsyncRuntimeHandle, func: F) -> Self + where + F: std::future::Future + Send + 'static, + { + let result = Arc::new(Mutex::new(None)); + let handle = Self::Pending(result.clone()); + + runtime.spawn_future(async move { + let r = func.await; + result.lock().replace(r); + //TODO: refresh egui? + }); + + handle + } + + /// Check if the future has completed and, if so, update our state to [`Self::Completed`]. + pub fn on_frame_start(&mut self) { + let result = match self { + Self::Pending(handle) => handle.lock().take(), + Self::Completed(_) => None, + }; + + if let Some(result) = result { + *self = Self::Completed(result); + } + } + + /// Get a reference to the received object, if the request has completed. + pub fn try_as_ref(&self) -> Option<&T> { + match self { + Self::Pending(_) => None, + Self::Completed(result) => Some(result), + } + } +} From 520bb804d77103b15b00bd1dff8f87781833929c Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Thu, 20 Feb 2025 15:47:06 +0100 Subject: [PATCH 08/15] Use n=1 channel instead of a mutex --- Cargo.lock | 1 + Cargo.toml | 1 + crates/viewer/re_redap_browser/Cargo.toml | 1 + .../re_redap_browser/src/requested_object.rs | 16 +++++++--------- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 980666fa1457..387b360419db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6436,6 +6436,7 @@ version = "0.23.0-alpha.1+dev" dependencies = [ "ahash", "arrow", + "crossbeam-channel", "egui", "egui_table", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index 88fedcdee81f..549343ef4b44 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -183,6 +183,7 @@ const_format = "0.2" convert_case = "0.6" criterion = "0.5" crossbeam = "0.8" +crossbeam-channel = "0.5" directories = "5" document-features = "0.2.8" econtext = "0.2" # Prints error contexts on crashes diff --git a/crates/viewer/re_redap_browser/Cargo.toml b/crates/viewer/re_redap_browser/Cargo.toml index 695f726e258f..bc05b136f051 100644 --- a/crates/viewer/re_redap_browser/Cargo.toml +++ b/crates/viewer/re_redap_browser/Cargo.toml @@ -36,6 +36,7 @@ re_uri.workspace = true ahash.workspace = true arrow.workspace = true +crossbeam-channel.workspace = true egui.workspace = true egui_table.workspace = true once_cell.workspace = true diff --git a/crates/viewer/re_redap_browser/src/requested_object.rs b/crates/viewer/re_redap_browser/src/requested_object.rs index 59f4019fd3e6..3084dcd5ff3e 100644 --- a/crates/viewer/re_redap_browser/src/requested_object.rs +++ b/crates/viewer/re_redap_browser/src/requested_object.rs @@ -1,12 +1,10 @@ -use std::sync::Arc; - -use parking_lot::Mutex; +use crossbeam_channel::{bounded, Receiver}; use re_viewer_context::AsyncRuntimeHandle; /// A handle to an object that is requested asynchronously. pub enum RequestedObject { - Pending(Arc>>), + Pending(Receiver), Completed(T), } @@ -16,12 +14,12 @@ impl RequestedObject { where F: std::future::Future + Send + 'static, { - let result = Arc::new(Mutex::new(None)); - let handle = Self::Pending(result.clone()); + let (tx, rx) = bounded(1); + let handle = Self::Pending(rx); runtime.spawn_future(async move { - let r = func.await; - result.lock().replace(r); + let result = func.await; + let _ = tx.send(result); //TODO: refresh egui? }); @@ -31,7 +29,7 @@ impl RequestedObject { /// Check if the future has completed and, if so, update our state to [`Self::Completed`]. pub fn on_frame_start(&mut self) { let result = match self { - Self::Pending(handle) => handle.lock().take(), + Self::Pending(rx) => rx.recv().ok(), Self::Completed(_) => None, }; From 41527bcf8717fae6b9975e09328ff06837e3ee57 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Thu, 20 Feb 2025 15:58:39 +0100 Subject: [PATCH 09/15] Clean up duplicate server list and better de/ser handling --- crates/viewer/re_redap_browser/src/servers.rs | 71 ++++++++----------- crates/viewer/re_viewer/src/app.rs | 2 +- 2 files changed, 32 insertions(+), 41 deletions(-) diff --git a/crates/viewer/re_redap_browser/src/servers.rs b/crates/viewer/re_redap_browser/src/servers.rs index 847315f8d3b3..8ffdafe7b6f7 100644 --- a/crates/viewer/re_redap_browser/src/servers.rs +++ b/crates/viewer/re_redap_browser/src/servers.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::BTreeMap; use std::sync::mpsc::{Receiver, Sender}; use re_ui::{list_item, UiExt}; @@ -70,14 +70,6 @@ impl Server { /// All servers known to the viewer, and their catalog data. pub struct RedapServers { - /// The list of server. - /// - /// This is the only data persisted. Everything else being recreated on the fly. - server_list: BTreeSet, - - /// The actual servers, populated from the server list if needed. - /// - /// Servers in `server_list` are automatically added to `server` by `on_frame_start`. servers: BTreeMap, selected_collection: Option, @@ -94,7 +86,10 @@ impl serde::Serialize for RedapServers { where S: serde::Serializer, { - self.server_list.serialize(serializer) + self.servers + .keys() + .collect::>() + .serialize(serializer) } } @@ -103,16 +98,14 @@ impl<'de> serde::Deserialize<'de> for RedapServers { where D: serde::Deserializer<'de>, { - let server_list = BTreeSet::::deserialize(deserializer)?; - let (command_sender, command_receiver) = std::sync::mpsc::channel(); - Ok(Self { - server_list, - servers: BTreeMap::new(), - selected_collection: None, - command_sender, - command_receiver, - add_server_modal_ui: Default::default(), - }) + let origins = Vec::::deserialize(deserializer)?; + + let servers = Self::default(); + for origin in origins { + let _ = servers.command_sender.send(Command::AddServer(origin)); + } + + Ok(servers) } } @@ -121,7 +114,6 @@ impl Default for RedapServers { let (command_sender, command_receiver) = std::sync::mpsc::channel(); Self { - server_list: Default::default(), servers: Default::default(), selected_collection: None, command_sender, @@ -140,14 +132,8 @@ pub enum Command { impl RedapServers { /// Add a server to the hub. - pub fn add_server(&mut self, origin: re_uri::Origin) { - self.server_list.insert(origin); - } - - /// Remove a server from the hub. - pub fn remove_server(&mut self, origin: &re_uri::Origin) { - self.server_list.remove(origin); - self.servers.remove(origin); + pub fn add_server(&self, origin: re_uri::Origin) { + let _ = self.command_sender.send(Command::AddServer(origin)); } /// Per-frame housekeeping. @@ -157,14 +143,7 @@ impl RedapServers { /// - Update all servers. pub fn on_frame_start(&mut self, runtime: &AsyncRuntimeHandle) { while let Ok(command) = self.command_receiver.try_recv() { - self.handle_command(command); - } - - for origin in &self.server_list { - if !self.servers.contains_key(origin) { - self.servers - .insert(origin.clone(), Server::new(runtime, origin.clone())); - } + self.handle_command(runtime, command); } for server in self.servers.values_mut() { @@ -172,7 +151,7 @@ impl RedapServers { } } - fn handle_command(&mut self, command: Command) { + fn handle_command(&mut self, runtime: &AsyncRuntimeHandle, command: Command) { match command { Command::SelectCollection(collection_handle) => { self.selected_collection = Some(collection_handle); @@ -180,9 +159,21 @@ impl RedapServers { Command::DeselectCollection => self.selected_collection = None, - Command::AddServer(origin) => self.add_server(origin), + Command::AddServer(origin) => { + if !self.servers.contains_key(&origin) { + self.servers + .insert(origin.clone(), Server::new(runtime, origin.clone())); + } else { + re_log::warn!( + "Tried to add pre-existing sever at {:?}", + origin.to_string() + ); + } + } - Command::RemoveServer(origin) => self.remove_server(&origin), + Command::RemoveServer(origin) => { + self.servers.remove(&origin); + } } } diff --git a/crates/viewer/re_viewer/src/app.rs b/crates/viewer/re_viewer/src/app.rs index 5b1cd177ca6b..6eea28268bba 100644 --- a/crates/viewer/re_viewer/src/app.rs +++ b/crates/viewer/re_viewer/src/app.rs @@ -1739,7 +1739,7 @@ impl App { } } - pub fn add_redap_server(&mut self, endpoint: re_uri::CatalogEndpoint) { + pub fn add_redap_server(&self, endpoint: re_uri::CatalogEndpoint) { self.state.redap_servers.add_server(endpoint.origin); } } From 78ee134b61d92fd87841e9dd2ffe491c252a252f Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Thu, 20 Feb 2025 16:06:32 +0100 Subject: [PATCH 10/15] Add auto ui repaint --- .../re_redap_browser/src/collections.rs | 15 +++++++++---- .../re_redap_browser/src/requested_object.rs | 18 ++++++++++++++- crates/viewer/re_redap_browser/src/servers.rs | 22 ++++++++++++------- crates/viewer/re_viewer/src/app.rs | 4 +++- 4 files changed, 45 insertions(+), 14 deletions(-) diff --git a/crates/viewer/re_redap_browser/src/collections.rs b/crates/viewer/re_redap_browser/src/collections.rs index 9acfcbd71496..5968275bc7b2 100644 --- a/crates/viewer/re_redap_browser/src/collections.rs +++ b/crates/viewer/re_redap_browser/src/collections.rs @@ -34,13 +34,20 @@ pub struct Collections { } impl Collections { - pub fn add(&mut self, runtime: &AsyncRuntimeHandle, origin: re_uri::Origin) { + pub fn add( + &mut self, + runtime: &AsyncRuntimeHandle, + egui_ctx: &egui::Context, + origin: re_uri::Origin, + ) { //TODO(ab): should we return error if the requested collection already exists? Or maybe just // query it again. self.collections.entry(origin.clone()).or_insert_with(|| { - RequestedObject::new(runtime, stream_catalog_async(origin)) - - //CollectionOrQueryHandle::QueryHandle(CollectionQueryHandle::new(runtime, origin)) + RequestedObject::new_with_repaint( + runtime, + egui_ctx.clone(), + stream_catalog_async(origin), + ) }); } diff --git a/crates/viewer/re_redap_browser/src/requested_object.rs b/crates/viewer/re_redap_browser/src/requested_object.rs index 3084dcd5ff3e..579d21e479a9 100644 --- a/crates/viewer/re_redap_browser/src/requested_object.rs +++ b/crates/viewer/re_redap_browser/src/requested_object.rs @@ -20,12 +20,28 @@ impl RequestedObject { runtime.spawn_future(async move { let result = func.await; let _ = tx.send(result); - //TODO: refresh egui? }); handle } + /// Create a new [`Self`] with the given future and automatically request a repaint of the UI + /// when the future completes. + pub fn new_with_repaint( + runtime: &AsyncRuntimeHandle, + egui_ctx: egui::Context, + func: F, + ) -> Self + where + F: std::future::Future + Send + 'static, + { + Self::new(runtime, async move { + let result = func.await; + egui_ctx.request_repaint(); + result + }) + } + /// Check if the future has completed and, if so, update our state to [`Self::Completed`]. pub fn on_frame_start(&mut self) { let result = match self { diff --git a/crates/viewer/re_redap_browser/src/servers.rs b/crates/viewer/re_redap_browser/src/servers.rs index 8ffdafe7b6f7..35973cb76ae0 100644 --- a/crates/viewer/re_redap_browser/src/servers.rs +++ b/crates/viewer/re_redap_browser/src/servers.rs @@ -15,13 +15,13 @@ struct Server { } impl Server { - fn new(runtime: &AsyncRuntimeHandle, origin: re_uri::Origin) -> Self { + fn new(runtime: &AsyncRuntimeHandle, egui_ctx: &egui::Context, origin: re_uri::Origin) -> Self { //let default_catalog = FetchCollectionTask::new(runtime, origin.clone()); let mut collections = Collections::default(); //TODO(ab): For now, we just auto-download the default collection - collections.add(runtime, origin.clone()); + collections.add(runtime, egui_ctx, origin.clone()); Self { origin, @@ -139,11 +139,10 @@ impl RedapServers { /// Per-frame housekeeping. /// /// - Process commands from the queue. - /// - Load servers from `server_list`. /// - Update all servers. - pub fn on_frame_start(&mut self, runtime: &AsyncRuntimeHandle) { + pub fn on_frame_start(&mut self, runtime: &AsyncRuntimeHandle, egui_ctx: &egui::Context) { while let Ok(command) = self.command_receiver.try_recv() { - self.handle_command(runtime, command); + self.handle_command(runtime, egui_ctx, command); } for server in self.servers.values_mut() { @@ -151,7 +150,12 @@ impl RedapServers { } } - fn handle_command(&mut self, runtime: &AsyncRuntimeHandle, command: Command) { + fn handle_command( + &mut self, + runtime: &AsyncRuntimeHandle, + egui_ctx: &egui::Context, + command: Command, + ) { match command { Command::SelectCollection(collection_handle) => { self.selected_collection = Some(collection_handle); @@ -161,8 +165,10 @@ impl RedapServers { Command::AddServer(origin) => { if !self.servers.contains_key(&origin) { - self.servers - .insert(origin.clone(), Server::new(runtime, origin.clone())); + self.servers.insert( + origin.clone(), + Server::new(runtime, egui_ctx, origin.clone()), + ); } else { re_log::warn!( "Tried to add pre-existing sever at {:?}", diff --git a/crates/viewer/re_viewer/src/app.rs b/crates/viewer/re_viewer/src/app.rs index 6eea28268bba..9d6c4f1b9ced 100644 --- a/crates/viewer/re_viewer/src/app.rs +++ b/crates/viewer/re_viewer/src/app.rs @@ -1205,7 +1205,9 @@ impl App { #[cfg(not(target_arch = "wasm32"))] let is_history_enabled = false; - self.state.redap_servers.on_frame_start(&self.async_runtime); + self.state + .redap_servers + .on_frame_start(&self.async_runtime, &self.egui_ctx); render_ctx.begin_frame(); self.state.show( From b49c08af8e022a2a0a29c67a9dfacfb231e33f7f Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Thu, 20 Feb 2025 16:10:00 +0100 Subject: [PATCH 11/15] remove useless crate --- Cargo.lock | 1 - crates/viewer/re_redap_browser/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 387b360419db..c4d3de8116ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6440,7 +6440,6 @@ dependencies = [ "egui", "egui_table", "once_cell", - "parking_lot", "re_arrow_util", "re_grpc_client", "re_log", diff --git a/crates/viewer/re_redap_browser/Cargo.toml b/crates/viewer/re_redap_browser/Cargo.toml index bc05b136f051..778ccae3ed20 100644 --- a/crates/viewer/re_redap_browser/Cargo.toml +++ b/crates/viewer/re_redap_browser/Cargo.toml @@ -40,7 +40,6 @@ crossbeam-channel.workspace = true egui.workspace = true egui_table.workspace = true once_cell.workspace = true -parking_lot.workspace = true serde.workspace = true thiserror.workspace = true tokio-stream.workspace = true From 575da7a1109ef39577ea0b52824b99e905b8542d Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Thu, 20 Feb 2025 16:18:59 +0100 Subject: [PATCH 12/15] Fix wasm build --- .../re_redap_browser/src/requested_object.rs | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/crates/viewer/re_redap_browser/src/requested_object.rs b/crates/viewer/re_redap_browser/src/requested_object.rs index 579d21e479a9..d0179eb780e5 100644 --- a/crates/viewer/re_redap_browser/src/requested_object.rs +++ b/crates/viewer/re_redap_browser/src/requested_object.rs @@ -10,6 +10,7 @@ pub enum RequestedObject { impl RequestedObject { /// Create a new [`Self`] with the given future. + #[cfg(not(target_arch = "wasm32"))] pub fn new(runtime: &AsyncRuntimeHandle, func: F) -> Self where F: std::future::Future + Send + 'static, @@ -27,6 +28,7 @@ impl RequestedObject { /// Create a new [`Self`] with the given future and automatically request a repaint of the UI /// when the future completes. + #[cfg(not(target_arch = "wasm32"))] pub fn new_with_repaint( runtime: &AsyncRuntimeHandle, egui_ctx: egui::Context, @@ -42,6 +44,41 @@ impl RequestedObject { }) } + /// Create a new [`Self`] with the given future. + #[cfg(target_arch = "wasm32")] + pub fn new(runtime: &AsyncRuntimeHandle, func: F) -> Self + where + F: std::future::Future + 'static, + { + let (tx, rx) = bounded(1); + let handle = Self::Pending(rx); + + runtime.spawn_future(async move { + let result = func.await; + let _ = tx.send(result); + }); + + handle + } + + /// Create a new [`Self`] with the given future and automatically request a repaint of the UI + /// when the future completes. + #[cfg(target_arch = "wasm32")] + pub fn new_with_repaint( + runtime: &AsyncRuntimeHandle, + egui_ctx: egui::Context, + func: F, + ) -> Self + where + F: std::future::Future + 'static, + { + Self::new(runtime, async move { + let result = func.await; + egui_ctx.request_repaint(); + result + }) + } + /// Check if the future has completed and, if so, update our state to [`Self::Completed`]. pub fn on_frame_start(&mut self) { let result = match self { From a77ad2015101f6eb4e59b4b59d521bcf64e638b5 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Thu, 20 Feb 2025 16:32:42 +0100 Subject: [PATCH 13/15] Introduced `WasmNotSend` trait to fix that elusive, platform-dependent `Send`-ness --- .../re_redap_browser/src/requested_object.rs | 43 ++----------------- .../src/async_runtime_handle.rs | 16 ++++++- crates/viewer/re_viewer_context/src/lib.rs | 2 +- 3 files changed, 18 insertions(+), 43 deletions(-) diff --git a/crates/viewer/re_redap_browser/src/requested_object.rs b/crates/viewer/re_redap_browser/src/requested_object.rs index d0179eb780e5..043496fbf036 100644 --- a/crates/viewer/re_redap_browser/src/requested_object.rs +++ b/crates/viewer/re_redap_browser/src/requested_object.rs @@ -1,6 +1,6 @@ use crossbeam_channel::{bounded, Receiver}; -use re_viewer_context::AsyncRuntimeHandle; +use re_viewer_context::{AsyncRuntimeHandle, WasmNotSend}; /// A handle to an object that is requested asynchronously. pub enum RequestedObject { @@ -10,10 +10,9 @@ pub enum RequestedObject { impl RequestedObject { /// Create a new [`Self`] with the given future. - #[cfg(not(target_arch = "wasm32"))] pub fn new(runtime: &AsyncRuntimeHandle, func: F) -> Self where - F: std::future::Future + Send + 'static, + F: std::future::Future + WasmNotSend + 'static, { let (tx, rx) = bounded(1); let handle = Self::Pending(rx); @@ -28,49 +27,13 @@ impl RequestedObject { /// Create a new [`Self`] with the given future and automatically request a repaint of the UI /// when the future completes. - #[cfg(not(target_arch = "wasm32"))] pub fn new_with_repaint( runtime: &AsyncRuntimeHandle, egui_ctx: egui::Context, func: F, ) -> Self where - F: std::future::Future + Send + 'static, - { - Self::new(runtime, async move { - let result = func.await; - egui_ctx.request_repaint(); - result - }) - } - - /// Create a new [`Self`] with the given future. - #[cfg(target_arch = "wasm32")] - pub fn new(runtime: &AsyncRuntimeHandle, func: F) -> Self - where - F: std::future::Future + 'static, - { - let (tx, rx) = bounded(1); - let handle = Self::Pending(rx); - - runtime.spawn_future(async move { - let result = func.await; - let _ = tx.send(result); - }); - - handle - } - - /// Create a new [`Self`] with the given future and automatically request a repaint of the UI - /// when the future completes. - #[cfg(target_arch = "wasm32")] - pub fn new_with_repaint( - runtime: &AsyncRuntimeHandle, - egui_ctx: egui::Context, - func: F, - ) -> Self - where - F: std::future::Future + 'static, + F: std::future::Future + WasmNotSend + 'static, { Self::new(runtime, async move { let result = func.await; diff --git a/crates/viewer/re_viewer_context/src/async_runtime_handle.rs b/crates/viewer/re_viewer_context/src/async_runtime_handle.rs index 1d143248e470..8b5de7b3c803 100644 --- a/crates/viewer/re_viewer_context/src/async_runtime_handle.rs +++ b/crates/viewer/re_viewer_context/src/async_runtime_handle.rs @@ -1,3 +1,15 @@ +#[cfg(not(target_arch = "wasm32"))] +pub trait WasmNotSend: Send {} + +#[cfg(target_arch = "wasm32")] +pub trait WasmNotSend {} + +#[cfg(not(target_arch = "wasm32"))] +impl WasmNotSend for T {} + +#[cfg(target_arch = "wasm32")] +impl WasmNotSend for T {} + #[derive(Debug, thiserror::Error)] pub enum AsyncRuntimeError { /// Tokio returned an error. @@ -48,7 +60,7 @@ impl AsyncRuntimeHandle { #[expect(clippy::unused_self)] pub fn spawn_future(&self, future: F) where - F: std::future::Future + 'static, + F: std::future::Future + WasmNotSend + 'static, { wasm_bindgen_futures::spawn_local(future); } @@ -56,7 +68,7 @@ impl AsyncRuntimeHandle { #[cfg(not(target_arch = "wasm32"))] pub fn spawn_future(&self, future: F) where - F: std::future::Future + 'static + Send, + F: std::future::Future + WasmNotSend + 'static, { self.tokio.spawn(future); } diff --git a/crates/viewer/re_viewer_context/src/lib.rs b/crates/viewer/re_viewer_context/src/lib.rs index 9aab6ef75093..79e06ba3922a 100644 --- a/crates/viewer/re_viewer_context/src/lib.rs +++ b/crates/viewer/re_viewer_context/src/lib.rs @@ -38,7 +38,7 @@ mod visitor_flow_control; pub use self::{ annotations::{AnnotationMap, Annotations, ResolvedAnnotationInfo, ResolvedAnnotationInfos}, - async_runtime_handle::{AsyncRuntimeError, AsyncRuntimeHandle}, + async_runtime_handle::{AsyncRuntimeError, AsyncRuntimeHandle, WasmNotSend}, blueprint_helpers::{blueprint_timeline, blueprint_timepoint_for_writes}, blueprint_id::{BlueprintId, BlueprintIdRegistry, ContainerId, ViewId}, cache::{Cache, Caches, ImageDecodeCache, ImageStatsCache, TensorStatsCache, VideoCache}, From 2ec97ee57c8560be8cf5ccdebd0a22483d1858a1 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Thu, 20 Feb 2025 16:49:19 +0100 Subject: [PATCH 14/15] add comment --- crates/viewer/re_redap_browser/src/servers.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/viewer/re_redap_browser/src/servers.rs b/crates/viewer/re_redap_browser/src/servers.rs index 35973cb76ae0..7ae2a767be90 100644 --- a/crates/viewer/re_redap_browser/src/servers.rs +++ b/crates/viewer/re_redap_browser/src/servers.rs @@ -101,6 +101,9 @@ impl<'de> serde::Deserialize<'de> for RedapServers { let origins = Vec::::deserialize(deserializer)?; let servers = Self::default(); + + // We cannot create `Server` right away, because we need an async handle and an + // `egui::Context` for that, so we just queue commands to be processed early next frame. for origin in origins { let _ = servers.command_sender.send(Command::AddServer(origin)); } From bec44add3a64406759037d0e4e4676d34fd977c9 Mon Sep 17 00:00:00 2001 From: Antoine Beyeler Date: Thu, 20 Feb 2025 20:56:49 +0100 Subject: [PATCH 15/15] duh --- crates/viewer/re_redap_browser/src/requested_object.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/viewer/re_redap_browser/src/requested_object.rs b/crates/viewer/re_redap_browser/src/requested_object.rs index 043496fbf036..c24f6dd7158a 100644 --- a/crates/viewer/re_redap_browser/src/requested_object.rs +++ b/crates/viewer/re_redap_browser/src/requested_object.rs @@ -45,7 +45,7 @@ impl RequestedObject { /// Check if the future has completed and, if so, update our state to [`Self::Completed`]. pub fn on_frame_start(&mut self) { let result = match self { - Self::Pending(rx) => rx.recv().ok(), + Self::Pending(rx) => rx.try_recv().ok(), Self::Completed(_) => None, };