Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataLoaders 5: add support for external binary DataLoaders (PATH) #4521

Merged
merged 14 commits into from
Dec 15, 2023
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

190 changes: 190 additions & 0 deletions crates/re_data_source/src/data_loader/loader_external.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
use std::io::Read;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should document somewhere what we expect from a loader, eg in a //!-level docstring

In particular:

  • what arguments to expect (--recording-id)
  • what exit code to use for unsupported file types
  • what and where to output (.rrd data to stdout)

This should also be added as a how-to guide on rerun.io/docs!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an upcoming PR that adds a web guide, I'll add a link to that guide in the doc string then


use once_cell::sync::Lazy;

/// To register a new external data loader, simply add an executable in your $PATH whose name
/// starts with this prefix.
pub const EXTERNAL_DATA_LOADER_PREFIX: &str = "rerun-loader-";

/// Keeps track of the paths all external executable [`crate::DataLoader`]s.
///
/// Lazy initialized the first time a file is opened by running a full scan of the `$PATH`.
///
/// External loaders are _not_ registered on a per-extension basis: we want users to be able to
/// filter data on a much more fine-grained basis that just file extensions (e.g. checking the file
/// itself for magic bytes).
pub static EXTERNAL_LOADER_PATHS: Lazy<Vec<std::path::PathBuf>> = Lazy::new(|| {
re_tracing::profile_function!();

use walkdir::WalkDir;

let dirpaths = std::env::var("PATH")
.ok()
.into_iter()
.flat_map(|paths| paths.split(':').map(ToOwned::to_owned).collect::<Vec<_>>())
.map(std::path::PathBuf::from);

let executables: ahash::HashSet<_> = dirpaths
.into_iter()
.flat_map(|dirpath| {
WalkDir::new(dirpath).into_iter().filter_map(|entry| {
let Ok(entry) = entry else {
return None;
};
let filepath = entry.path();
let is_rerun_loader = filepath.file_name().map_or(false, |filename| {
filename
.to_string_lossy()
.starts_with(EXTERNAL_DATA_LOADER_PREFIX)
});
(filepath.is_file() && is_rerun_loader).then(|| filepath.to_owned())
})
})
.collect();

// NOTE: We call all available loaders and do so in parallel: order is irrelevant here.
executables.into_iter().collect()
});

/// Iterator over all registered external [`crate::DataLoader`]s.
#[inline]
pub fn iter_external_loaders() -> impl ExactSizeIterator<Item = std::path::PathBuf> {
EXTERNAL_LOADER_PATHS.iter().cloned()
}

// ---

/// A [`crate::DataLoader`] that forwards the path to load to all executables present in
/// the user's `PATH` with a name that starts with `EXTERNAL_DATA_LOADER_PREFIX`.
///
/// The external loaders are expected to log rrd data to their standard output.
///
/// Refer to our `external_data_loader` example for more information.
pub struct ExternalLoader;

impl crate::DataLoader for ExternalLoader {
#[inline]
fn name(&self) -> String {
"rerun.data_loaders.External".into()
}

fn load_from_path(
&self,
store_id: re_log_types::StoreId,
filepath: std::path::PathBuf,
tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
use std::process::{Command, Stdio};

re_tracing::profile_function!(filepath.display().to_string());

for exe in EXTERNAL_LOADER_PATHS.iter() {
let store_id = store_id.clone();
let filepath = filepath.clone();
let tx = tx.clone();

// NOTE: spawn is fine, the entire loader is native-only.
rayon::spawn(move || {
re_tracing::profile_function!();

let child = Command::new(exe)
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
.arg(filepath.clone())
.args(["--recording-id".to_owned(), store_id.to_string()])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn();

let mut child = match child {
Ok(child) => child,
Err(err) => {
re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
return;
}
};

let Some(stdout) = child.stdout.take() else {
let reason = "stdout unreachable";
re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
return;
};
let Some(stderr) = child.stderr.take() else {
let reason = "stderr unreachable";
re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
return;
};

re_log::debug!(?filepath, loader = ?exe, "Loading data from filesystem using external loader…",);

let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let stdout = std::io::BufReader::new(stdout);
match re_log_encoding::decoder::Decoder::new(version_policy, stdout) {
Ok(decoder) => {
decode_and_stream(&filepath, &tx, decoder);
}
Err(re_log_encoding::decoder::DecodeError::Read(_)) => {
// The child was not interested in that file and left without logging
// anything.
// That's fine, we just need to make sure to check its exit status further
// down, still.
return;
}
Err(err) => {
re_log::error!(?filepath, loader = ?exe, %err, "Failed to decode external loader's output");
return;
}
};

let status = match child.wait() {
Ok(output) => output,
Err(err) => {
re_log::error!(?filepath, loader = ?exe, %err, "Failed to execute external loader");
return;
}
};

if !status.success() {
let mut stderr = std::io::BufReader::new(stderr);
let mut reason = String::new();
stderr.read_to_string(&mut reason).ok();
re_log::error!(?filepath, loader = ?exe, %reason, "Failed to execute external loader");
}
});
}

Ok(())
}

#[inline]
fn load_from_file_contents(
&self,
_store_id: re_log_types::StoreId,
_path: std::path::PathBuf,
_contents: std::borrow::Cow<'_, [u8]>,
_tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
// TODO(cmc): You could imagine a world where plugins can be streamed rrd data via their
// standard input… but today is not world.
Ok(()) // simply not interested
}
}

fn decode_and_stream<R: std::io::Read>(
filepath: &std::path::Path,
tx: &std::sync::mpsc::Sender<crate::LoadedData>,
decoder: re_log_encoding::decoder::Decoder<R>,
) {
re_tracing::profile_function!(filepath.display().to_string());

for msg in decoder {
let msg = match msg {
Ok(msg) => msg,
Err(err) => {
re_log::warn_once!("Failed to decode message in {filepath:?}: {err}");
continue;
}
};
if tx.send(msg.into()).is_err() {
break; // The other end has decided to hang up, not our problem.
}
}
}
20 changes: 18 additions & 2 deletions crates/re_data_source/src/data_loader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ use re_log_types::{ArrowMsg, DataRow, LogMsg};
/// [There are plans to make this generic over any URI](https://github.com/rerun-io/rerun/issues/4525).
///
/// Rerun comes with a few [`DataLoader`]s by default:
/// - [`RrdLoader`] for [Rerun files],
/// - [`RrdLoader`] for [Rerun files].
/// - [`ArchetypeLoader`] for:
/// - [3D models]
/// - [Images]
/// - [Point clouds]
/// - [Text files]
/// - [`DirectoryLoader`] for recursively loading folders.
/// - [`ExternalLoader`], which looks for user-defined data loaders in $PATH.
///
/// ## Execution
///
Expand All @@ -36,9 +39,10 @@ use re_log_types::{ArrowMsg, DataRow, LogMsg};
///
/// On native, [`DataLoader`]s are executed in parallel.
///
/// [Rerun extensions]: crate::SUPPORTED_RERUN_EXTENSIONS
/// [Rerun files]: crate::SUPPORTED_RERUN_EXTENSIONS
/// [3D models]: crate::SUPPORTED_MESH_EXTENSIONS
/// [Images]: crate::SUPPORTED_IMAGE_EXTENSIONS
/// [Point clouds]: crate::SUPPORTED_POINT_CLOUD_EXTENSIONS
/// [Text files]: crate::SUPPORTED_TEXT_EXTENSIONS
//
// TODO(#4525): `DataLoader`s should support arbitrary URIs
Expand Down Expand Up @@ -206,6 +210,8 @@ static BUILTIN_LOADERS: Lazy<Vec<Arc<dyn DataLoader>>> = Lazy::new(|| {
Arc::new(RrdLoader) as Arc<dyn DataLoader>,
Arc::new(ArchetypeLoader),
Arc::new(DirectoryLoader),
#[cfg(not(target_arch = "wasm32"))]
Arc::new(ExternalLoader),
]
});

Expand All @@ -221,6 +227,16 @@ mod loader_archetype;
mod loader_directory;
mod loader_rrd;

#[cfg(not(target_arch = "wasm32"))]
mod loader_external;

pub use self::loader_archetype::ArchetypeLoader;
pub use self::loader_directory::DirectoryLoader;
pub use self::loader_rrd::RrdLoader;

#[cfg(not(target_arch = "wasm32"))]
pub(crate) use self::loader_external::EXTERNAL_LOADER_PATHS;
#[cfg(not(target_arch = "wasm32"))]
pub use self::loader_external::{
iter_external_loaders, ExternalLoader, EXTERNAL_DATA_LOADER_PREFIX,
};
6 changes: 5 additions & 1 deletion crates/re_data_source/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ mod web_sockets;
mod load_stdin;

pub use self::data_loader::{
iter_loaders, ArchetypeLoader, DataLoader, DataLoaderError, LoadedData, RrdLoader,
iter_loaders, ArchetypeLoader, DataLoader, DataLoaderError, DirectoryLoader, LoadedData,
RrdLoader,
};
pub use self::data_source::DataSource;
pub use self::load_file::{extension, load_from_file_contents};
pub use self::web_sockets::connect_to_ws_url;

#[cfg(not(target_arch = "wasm32"))]
pub use self::data_loader::{iter_external_loaders, ExternalLoader};

#[cfg(not(target_arch = "wasm32"))]
pub use self::load_file::load_from_path;

Expand Down
9 changes: 8 additions & 1 deletion crates/re_data_source/src/load_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,17 @@ pub(crate) fn load(
is_dir: bool,
contents: Option<std::borrow::Cow<'_, [u8]>>,
) -> Result<std::sync::mpsc::Receiver<LoadedData>, DataLoaderError> {
#[cfg(target_arch = "wasm32")]
let has_external_loaders = false;
#[cfg(not(target_arch = "wasm32"))]
let has_external_loaders = !crate::data_loader::EXTERNAL_LOADER_PATHS.is_empty();

let extension = extension(path);
let is_builtin = is_associated_with_builtin_loader(path, is_dir);

if !is_builtin {
// If there are no external loaders registered (which is always the case on wasm) and we don't
// have a builtin loader for it, then we know for a fact that we won't be able to load it.
if !is_builtin && !has_external_loaders {
return if extension.is_empty() {
Err(anyhow::anyhow!("files without extensions (file.XXX) are not supported").into())
} else {
Expand Down
2 changes: 1 addition & 1 deletion crates/re_ui/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl UICommand {
"Save data for the current loop selection to a Rerun data file (.rrd)",
),

UICommand::Open => ("Open…", "Open a Rerun Data File (.rrd)"),
UICommand::Open => ("Open…", "Open any supported files (.rrd, images, meshes, …)"),

UICommand::CloseCurrentRecording => (
"Close current Recording",
Expand Down
20 changes: 15 additions & 5 deletions crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1297,11 +1297,21 @@ fn file_saver_progress_ui(egui_ctx: &egui::Context, background_tasks: &mut Backg
#[cfg(not(target_arch = "wasm32"))]
fn open_file_dialog_native() -> Vec<std::path::PathBuf> {
re_tracing::profile_function!();
let supported: Vec<_> = re_data_source::supported_extensions().collect();
rfd::FileDialog::new()
.add_filter("Supported files", &supported)
.pick_files()
.unwrap_or_default()

let supported: Vec<_> = if re_data_source::iter_external_loaders().len() == 0 {
re_data_source::supported_extensions().collect()
} else {
vec![]
};

let mut dialog = rfd::FileDialog::new();

// If there's at least one external loader registered, then literally anything goes!
if !supported.is_empty() {
dialog = dialog.add_filter("Supported files", &supported);
}

dialog.pick_files().unwrap_or_default()
}

#[cfg(target_arch = "wasm32")]
Expand Down
4 changes: 4 additions & 0 deletions examples/assets/example.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
int main() {
std::cout << "That will only work with the right plugin in your $PATH!" << std::endl;
std::cout << "Checkout the `external_data_loader` C++ example." << std::endl;
}
4 changes: 4 additions & 0 deletions examples/assets/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from __future__ import annotations

print("That will only work with the right plugin in your $PATH!")
print("Checkout the `external_data_loader` Python example.")
4 changes: 4 additions & 0 deletions examples/assets/example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fn main() {
println!("That will only work with the right plugin in your $PATH!");
println!("Checkout the `external_data_loader` Rust example.");
}
2 changes: 2 additions & 0 deletions examples/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
add_subdirectory(clock)
add_subdirectory(custom_collection_adapter)
add_subdirectory(dna)
add_subdirectory(external_data_loader)
add_subdirectory(minimal)
add_subdirectory(shared_recording)
add_subdirectory(spawn_viewer)
Expand All @@ -15,3 +16,4 @@ add_dependencies(examples example_minimal)
add_dependencies(examples example_shared_recording)
add_dependencies(examples example_spawn_viewer)
add_dependencies(examples example_stdio)
add_dependencies(examples rerun-loader-cpp-file)
32 changes: 32 additions & 0 deletions examples/cpp/external_data_loader/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
cmake_minimum_required(VERSION 3.16...3.27)

# If you use the example outside of the Rerun SDK you need to specify
# where the rerun_c build is to be found by setting the `RERUN_CPP_URL` variable.
# This can be done by passing `-DRERUN_CPP_URL=<path to rerun_sdk_cpp zip>` to cmake.
if(DEFINED RERUN_REPOSITORY)
add_executable(rerun-loader-cpp-file main.cpp)
rerun_strict_warning_settings(rerun-loader-cpp-file)
else()
project(rerun-loader-cpp-file LANGUAGES CXX)

add_executable(rerun-loader-cpp-file main.cpp)

# Set the path to the rerun_c build.
set(RERUN_CPP_URL "https://github.com/rerun-io/rerun/releases/latest/download/rerun_cpp_sdk.zip" CACHE STRING "URL to the rerun_cpp zip.")
option(RERUN_FIND_PACKAGE "Whether to use find_package to find a preinstalled rerun package (instead of using FetchContent)." OFF)

if(RERUN_FIND_PACKAGE)
find_package(rerun_sdk REQUIRED)
else()
# Download the rerun_sdk
include(FetchContent)
FetchContent_Declare(rerun_sdk URL ${RERUN_CPP_URL})
FetchContent_MakeAvailable(rerun_sdk)
endif()

# Rerun requires at least C++17, but it should be compatible with newer versions.
set_property(TARGET rerun-loader-cpp-file PROPERTY CXX_STANDARD 17)
endif()

# Link against rerun_sdk.
target_link_libraries(rerun-loader-cpp-file PRIVATE rerun_sdk)
Loading
Loading