Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(filesystem): Use channels to communicate within webdav filesystem #1361

Merged
merged 5 commits into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/commands/webdav.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
// ignore markdown clippy lints as we use doc-comments to generate clap help texts
#![allow(clippy::doc_markdown)]

mod webdavfs;
use webdavfs::WebDavFS;

use std::net::ToSocketAddrs;

use crate::{repository::CliIndexedRepo, status_err, Application, RusticConfig, RUSTIC_APP};
Expand All @@ -13,9 +16,6 @@ use dav_server::{warp::dav_handler, DavHandler};
use serde::{Deserialize, Serialize};

use rustic_core::vfs::{FilePolicy, IdenticalSnapshot, Latest, Vfs};
use webdavfs::WebDavFS;

mod webdavfs;

#[derive(Clone, Command, Default, Debug, clap::Parser, Serialize, Deserialize, Merge)]
#[serde(default, rename_all = "kebab-case", deny_unknown_fields)]
Expand Down
254 changes: 190 additions & 64 deletions src/commands/webdav/webdavfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::os::unix::ffi::OsStrExt;
use std::{
fmt::{Debug, Formatter},
io::SeekFrom,
sync::{Arc, OnceLock},
sync::OnceLock,
time::SystemTime,
};

Expand All @@ -16,6 +16,7 @@ use dav_server::{
},
};
use futures::FutureExt;
use tokio::sync::{mpsc, oneshot};

use rustic_core::{
repofile::Node,
Expand All @@ -40,6 +41,102 @@ struct DavFsInner<P, S> {
file_policy: FilePolicy,
}

impl<P, S: IndexedFull> DavFsInner<P, S> {
/// Get a [`Node`] from the specified [`DavPath`].
///
/// # Arguments
///
/// * `path` - The path to get the [`Tree`] at
///
/// # Errors
///
/// * If the [`Tree`] could not be found
///
/// # Returns
///
/// The [`Node`] at the specified path
///
/// [`Tree`]: crate::repofile::Tree
fn node_from_path(&self, path: &DavPath) -> Result<Node, FsError> {
self.vfs
.node_from_path(&self.repo, &path.as_pathbuf())
.map_err(|_| FsError::GeneralFailure)
}

/// Get a list of [`Node`]s from the specified directory path.
///
/// # Arguments
///
/// * `path` - The path to get the [`Tree`] at
///
/// # Errors
///
/// * If the [`Tree`] could not be found
///
/// # Returns
///
/// The list of [`Node`]s at the specified path
///
/// [`Tree`]: crate::repofile::Tree
fn dir_entries_from_path(&self, path: &DavPath) -> Result<Vec<Node>, FsError> {
self.vfs
.dir_entries_from_path(&self.repo, &path.as_pathbuf())
.map_err(|_| FsError::GeneralFailure)
}

fn open(&self, node: &Node, options: OpenOptions) -> Result<OpenFile, FsError> {
if options.write
|| options.append
|| options.truncate
|| options.create
|| options.create_new
{
return Err(FsError::Forbidden);
}

if matches!(self.file_policy, FilePolicy::Forbidden) {
return Err(FsError::Forbidden);
}

let open = self
.repo
.open_file(node)
.map_err(|_err| FsError::GeneralFailure)?;
Ok(open)
}

fn read_bytes(
&self,
file: OpenFile,
seek: usize,
count: usize,
) -> Result<(Bytes, OpenFile), FsError> {
let data = self
.repo
.read_file_at(&file, seek, count)
.map_err(|_err| FsError::GeneralFailure)?;
Ok((data, file))
}
}

/// Messages used
#[allow(clippy::large_enum_variant)]
enum DavFsInnerCommand {
Node(DavPath, oneshot::Sender<Result<Node, FsError>>),
DirEntries(DavPath, oneshot::Sender<Result<Vec<Node>, FsError>>),
Open(
Node,
OpenOptions,
oneshot::Sender<Result<OpenFile, FsError>>,
),
ReadBytes(
OpenFile,
usize,
usize,
oneshot::Sender<Result<(Bytes, OpenFile), FsError>>,
),
}

impl<P, S> Debug for DavFsInner<P, S> {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(f, "DavFS")
Expand All @@ -50,12 +147,12 @@ impl<P, S> Debug for DavFsInner<P, S> {
///
/// This is the main entry point for the DAV filesystem.
/// It implements [`DavFileSystem`] and can be used to serve a [`Repository`] via DAV.
#[derive(Debug)]
pub struct WebDavFS<P, S> {
inner: Arc<DavFsInner<P, S>>,
#[derive(Debug, Clone)]
pub struct WebDavFS {
send: mpsc::Sender<DavFsInnerCommand>,
}

impl<P, S: IndexedFull> WebDavFS<P, S> {
impl WebDavFS {
/// Create a new [`WebDavFS`] instance.
///
/// # Arguments
Expand All @@ -67,16 +164,44 @@ impl<P, S: IndexedFull> WebDavFS<P, S> {
/// # Returns
///
/// A new [`WebDavFS`] instance
pub(crate) fn new(repo: Repository<P, S>, vfs: Vfs, file_policy: FilePolicy) -> Self {
pub(crate) fn new<P: Send + 'static, S: IndexedFull + Send + 'static>(
repo: Repository<P, S>,
vfs: Vfs,
file_policy: FilePolicy,
) -> Self {
let inner = DavFsInner {
repo,
vfs,
file_policy,
};

Self {
inner: Arc::new(inner),
}
let (send, mut rcv) = mpsc::channel(1);

let _ = std::thread::spawn(move || -> Result<_, FsError> {
while let Some(task) = rcv.blocking_recv() {
match task {
DavFsInnerCommand::Node(path, res) => {
res.send(inner.node_from_path(&path))
.map_err(|_err| FsError::GeneralFailure)?;
}
DavFsInnerCommand::DirEntries(path, res) => {
res.send(inner.dir_entries_from_path(&path))
.map_err(|_err| FsError::GeneralFailure)?;
}
DavFsInnerCommand::Open(path, open_options, res) => {
res.send(inner.open(&path, open_options))
.map_err(|_err| FsError::GeneralFailure)?;
}
DavFsInnerCommand::ReadBytes(file, seek, count, res) => {
res.send(inner.read_bytes(file, seek, count))
.map_err(|_err| FsError::GeneralFailure)?;
}
}
}
Ok(())
});

Self { send }
}

/// Get a [`Node`] from the specified [`DavPath`].
Expand All @@ -94,11 +219,13 @@ impl<P, S: IndexedFull> WebDavFS<P, S> {
/// The [`Node`] at the specified path
///
/// [`Tree`]: crate::repofile::Tree
fn node_from_path(&self, path: &DavPath) -> Result<Node, FsError> {
self.inner
.vfs
.node_from_path(&self.inner.repo, &path.as_pathbuf())
.map_err(|_| FsError::GeneralFailure)
async fn node_from_path(&self, path: &DavPath) -> Result<Node, FsError> {
let (send, rcv) = oneshot::channel();
self.send
.send(DavFsInnerCommand::Node(path.clone(), send))
.await
.map_err(|_err| FsError::GeneralFailure)?;
rcv.await.map_err(|_err| FsError::GeneralFailure)?
}

/// Get a list of [`Node`]s from the specified directory path.
Expand All @@ -116,32 +243,46 @@ impl<P, S: IndexedFull> WebDavFS<P, S> {
/// The list of [`Node`]s at the specified path
///
/// [`Tree`]: crate::repofile::Tree
fn dir_entries_from_path(&self, path: &DavPath) -> Result<Vec<Node>, FsError> {
self.inner
.vfs
.dir_entries_from_path(&self.inner.repo, &path.as_pathbuf())
.map_err(|_| FsError::GeneralFailure)
async fn dir_entries_from_path(&self, path: &DavPath) -> Result<Vec<Node>, FsError> {
let (send, rcv) = oneshot::channel();
self.send
.send(DavFsInnerCommand::DirEntries(path.clone(), send))
.await
.map_err(|_err| FsError::GeneralFailure)?;
rcv.await.map_err(|_err| FsError::GeneralFailure)?
}
}

impl<P, S: IndexedFull> Clone for WebDavFS<P, S> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
async fn open(&self, node: &Node, options: OpenOptions) -> Result<OpenFile, FsError> {
let (send, rcv) = oneshot::channel();
self.send
.send(DavFsInnerCommand::Open(node.clone(), options, send))
.await
.map_err(|_err| FsError::GeneralFailure)?;
rcv.await.map_err(|_err| FsError::GeneralFailure)?
}
async fn read_bytes(
&self,
file: OpenFile,
seek: usize,
count: usize,
) -> Result<(Bytes, OpenFile), FsError> {
let (send, rcv) = oneshot::channel();
self.send
.send(DavFsInnerCommand::ReadBytes(file, seek, count, send))
.await
.map_err(|_err| FsError::GeneralFailure)?;
rcv.await.map_err(|_err| FsError::GeneralFailure)?
}
}

impl<P: Debug + Send + Sync + 'static, S: IndexedFull + Debug + Send + Sync + 'static> DavFileSystem
for WebDavFS<P, S>
{
impl DavFileSystem for WebDavFS {
fn metadata<'a>(&'a self, davpath: &'a DavPath) -> FsFuture<'_, Box<dyn DavMetaData>> {
self.symlink_metadata(davpath)
}

fn symlink_metadata<'a>(&'a self, davpath: &'a DavPath) -> FsFuture<'_, Box<dyn DavMetaData>> {
async move {
let node = self.node_from_path(davpath)?;
let node = self.node_from_path(davpath).await?;
let meta: Box<dyn DavMetaData> = Box::new(DavFsMetaData(node));
Ok(meta)
}
Expand All @@ -154,7 +295,7 @@ impl<P: Debug + Send + Sync + 'static, S: IndexedFull + Debug + Send + Sync + 's
_meta: ReadDirMeta,
) -> FsFuture<'_, FsStream<Box<dyn DavDirEntry>>> {
async move {
let entries = self.dir_entries_from_path(davpath)?;
let entries = self.dir_entries_from_path(davpath).await?;
let entry_iter = entries.into_iter().map(|e| {
let entry: Box<dyn DavDirEntry> = Box::new(DavFsDirEntry(e));
Ok(entry)
Expand All @@ -171,30 +312,13 @@ impl<P: Debug + Send + Sync + 'static, S: IndexedFull + Debug + Send + Sync + 's
options: OpenOptions,
) -> FsFuture<'_, Box<dyn DavFile>> {
async move {
if options.write
|| options.append
|| options.truncate
|| options.create
|| options.create_new
{
return Err(FsError::Forbidden);
}

let node = self.node_from_path(path)?;
if matches!(self.inner.file_policy, FilePolicy::Forbidden) {
return Err(FsError::Forbidden);
}

let open = self
.inner
.repo
.open_file(&node)
.map_err(|_err| FsError::GeneralFailure)?;
let node = self.node_from_path(path).await?;
let file = self.open(&node, options).await?;
let file: Box<dyn DavFile> = Box::new(DavFsFile {
node,
open,
fs: self.inner.clone(),
open: Some(file),
seek: 0,
fs: self.clone(),
node,
});
Ok(file)
}
Expand Down Expand Up @@ -234,27 +358,25 @@ impl DavDirEntry for DavFsDirEntry {
/// A [`DavFile`] implementation for [`Node`]s.
///
/// This is a read-only file.
struct DavFsFile<P, S> {
struct DavFsFile {
/// The [`Node`] this file is for
node: Node,

/// The [`OpenFile`] for this file
open: OpenFile,

/// The [`DavFsInner`] this file belongs to
fs: Arc<DavFsInner<P, S>>,
open: Option<OpenFile>,

/// The current seek position
seek: usize,
fs: WebDavFS,
}

impl<P, S> Debug for DavFsFile<P, S> {
impl Debug for DavFsFile {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(f, "DavFile")
}
}

impl<P: Debug + Send + Sync, S: IndexedFull + Debug + Send + Sync> DavFile for DavFsFile<P, S> {
impl DavFile for DavFsFile {
fn metadata(&mut self) -> FsFuture<'_, Box<dyn DavMetaData>> {
async move {
let meta: Box<dyn DavMetaData> = Box::new(DavFsMetaData(self.node.clone()));
Expand All @@ -273,12 +395,16 @@ impl<P: Debug + Send + Sync, S: IndexedFull + Debug + Send + Sync> DavFile for D

fn read_bytes(&mut self, count: usize) -> FsFuture<'_, Bytes> {
async move {
let data = self
let (data, open) = self
.fs
.repo
.read_file_at(&self.open, self.seek, count)
.map_err(|_err| FsError::GeneralFailure)?;
.read_bytes(
self.open.take().ok_or(FsError::GeneralFailure)?,
self.seek,
count,
)
.await?;
self.seek += data.len();
self.open = Some(open);
Ok(data)
}
.boxed()
Expand All @@ -292,7 +418,7 @@ impl<P: Debug + Send + Sync, S: IndexedFull + Debug + Send + Sync> DavFile for D
}
SeekFrom::Current(delta) => {
self.seek = usize::try_from(
i64::try_from(self.seek).expect("i64 wrapped around") + delta,
i64::try_from(self.seek).expect("i64 should not wrap around") + delta,
)
.expect("usize overflow should not happen");
}
Expand Down
Loading